From a57200ebb29d56eda8cf2809975eb5e5d4ca10c8 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 29 Apr 2026 21:18:39 +0800 Subject: [PATCH 1/8] [python] Fix limit push-down: add non-raw splits to plan, use merged_row_count for budget MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two divergences from Java's DataTableBatchScan.applyPushDownLimit(): 1) Non-raw_convertible splits were skipped entirely by the loop body — they never entered ``limited_splits``. As a consequence, when a non-raw split appeared BEFORE a raw split that meets the limit, the early-return omitted the non-raw split from the plan altogether. Java unconditionally adds every visited split. 2) The accumulator used ``split.row_count`` (file-level pre-DV upper bound) where Java uses ``split.partialMergedRowCount()`` — file row_count *minus* any deletion-vector cardinality already recorded in the manifest. Python has the same value via ``DataSplit.merged_row_count()``, but ``_apply_push_down_limit`` wasn't using it, so on DV-aware raw splits the accumulator over-counted and the early-return fired before the reader could actually produce ``limit`` rows. The two divergences interact. With ``[non-raw, raw]`` and a tight limit, (1) silently drops the non-raw partition's data. With ``[raw_with_DV, non-raw, ...]`` and a limit between the post-DV and pre-DV row counts, (2) makes the loop early-return on the DV split alone, leaving the reader with fewer rows than it could otherwise produce by also draining the trailing non-raw splits. Fix: for split in splits: limited_splits.append(split) # add unconditionally if split.raw_convertible: merged = split.merged_row_count() scanned_row_count += merged if merged is not None else split.row_count if scanned_row_count >= self.limit: return limited_splits return splits The ``merged is not None`` fallback to ``split.row_count`` keeps the previous behaviour for layouts where the merged count cannot be derived from the manifest (older snapshots, some data-evolution shapes); using the pre-DV upper bound there is still strictly better than the alternative of skipping that split's contribution to the budget. Tests: test_limit_drops_non_raw_split_after_raw_budget_is_met (new): deterministic ``[non-raw (p1), raw (p2)]`` plan. Pre-fix (master) fails with ``1 != 2``: ``limited_splits=[raw]``, p1's data is silently dropped. Post-fix returns both splits. ApplyPushDownLimitUnitTest (new): synthetic-split unit tests for the accumulator, since pypaimon's writer doesn't compact L0 → L1+ and the DV-enabled PK read path skips L0, so a true DV-aware raw_convertible split is hard to produce from a pure-Python end- to-end fixture. Cases: * test_dv_aware_accumulator_uses_merged_row_count — ``[raw(row_count=10, merged=4), non-raw, non-raw]`` + limit=5. Pre-fix: early-returns after the raw split → 1 split. Post-fix: 4 < 5 keeps walking → 3 splits. * test_accumulator_falls_back_to_row_count_when_merged_unavailable — guards the ``merged is None`` fallback path. * test_no_raw_splits_falls_through_to_full_list — all-non-raw falls through to the loop's terminal ``return splits``. * test_empty_splits_returns_empty / test_no_limit_returns_input_unchanged — boundary conditions. --- .../pypaimon/read/scanner/file_scanner.py | 24 +- .../tests/reader_split_generator_test.py | 216 ++++++++++++++++++ 2 files changed, 238 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index ffbd83daf0da..1885c3856178 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -371,10 +371,30 @@ def _apply_push_down_limit(self, splits: List[DataSplit]) -> List[DataSplit]: scanned_row_count = 0 limited_splits = [] + # Keep every split. Only ``raw_convertible`` splits contribute to + # the row-count accumulator. Mirrors Java's + # DataTableBatchScan.applyPushDownLimit() — Java accumulates + # ``split.partialMergedRowCount()``, the file-level row count *minus* + # any deletion-vector cardinality already recorded in the manifest. + # Python has the same value via ``DataSplit.merged_row_count()``; + # the previous Python code accumulated ``split.row_count`` which is + # the pre-DV upper bound and over-counts when DV is on, causing the + # early return to fire before the reader can actually produce the + # budget. When the merged count is unavailable (older manifests / + # some data-evolution layouts where ``first_row_id`` is not + # populated) fall back to ``split.row_count``: the file-level upper + # bound is the previous behaviour and is still safer than skipping + # the split's contribution entirely. + # Non-raw_convertible splits cannot be cheaply counted ahead of + # read, so we keep them around so the reader still has data to + # drain. for split in splits: + limited_splits.append(split) if split.raw_convertible: - limited_splits.append(split) - scanned_row_count += split.row_count + merged = split.merged_row_count() + scanned_row_count += ( + merged if merged is not None else split.row_count + ) if scanned_row_count >= self.limit: return limited_splits diff --git a/paimon-python/pypaimon/tests/reader_split_generator_test.py b/paimon-python/pypaimon/tests/reader_split_generator_test.py index 53819aebd5b8..8587c1315e46 100644 --- a/paimon-python/pypaimon/tests/reader_split_generator_test.py +++ b/paimon-python/pypaimon/tests/reader_split_generator_test.py @@ -329,6 +329,222 @@ def test_sliced_split_merged_row_count(self): # Note: SlicedSplit may or may not be created depending on data distribution # This test ensures that if SlicedSplit is created, merged_row_count() works correctly + def test_limit_drops_non_raw_split_after_raw_budget_is_met(self): + """Reproducer for the limit-pushdown bug. + + The pre-fix loop in ``FileScanner._apply_push_down_limit`` was:: + + for split in splits: + if split.raw_convertible: + limited_splits.append(split) + scanned_row_count += split.row_count + if scanned_row_count >= self.limit: + return limited_splits # <-- early return drops + # any subsequent non-raw splits + return splits + + So the bug only triggers when: + (a) at least one raw_convertible split's ``row_count`` reaches the + limit, AND + (b) at least one *more* non-raw_convertible split exists in the + same plan. + + If every split is non-raw the loop never enters the body and the + fallback ``return splits`` masks the bug; if every split is raw + and row_counts sum to the limit, dropping the rest is correct + anyway. So a proper regression test must construct a *mix* of + raw and non-raw splits in a single plan and assert that + ``len(limited_splits) >= len(raw_convertible_splits) + 1`` — + i.e. at least one non-raw split survives the limit pushdown. + + Build that mix using two partitions with a single bucket each: + * partition ``p1`` — one batch, one file → raw_convertible split + * partition ``p2`` — two overlapping batches on the same PK → + non-raw_convertible split (needs merge-on-read) + + With ``bucket=1`` per partition the layout is deterministic. + """ + pa_schema = pa.schema([ + pa.field('id', pa.int64(), nullable=False), + pa.field('dt', pa.string(), nullable=False), + ('value', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + partition_keys=['dt'], + primary_keys=['id', 'dt'], + options={'bucket': '1', 'merge-engine': 'deduplicate'}, + ) + self.catalog.create_table( + 'default.test_limit_mix_raw_nonraw', schema, False) + table = self.catalog.get_table('default.test_limit_mix_raw_nonraw') + + def _write(rows): + wb = table.new_batch_write_builder() + w = wb.new_write() + c = wb.new_commit() + try: + w.write_arrow(pa.Table.from_pylist(rows, schema=pa_schema)) + c.commit(w.prepare_commit()) + finally: + w.close() + c.close() + + # Order matters. The buggy loop short-circuits as soon as one + # raw_convertible split's row_count meets the limit, so a + # non-raw split that appears AFTER that point is dropped no + # matter what (matching Java). The bug we want to catch is the + # one where a non-raw split appearing BEFORE the raw budget is + # met gets silently skipped — the iteration would never even + # add it to ``limited_splits``. + # ``PrimaryKeyTableSplitGenerator`` walks partitions in order, + # so making ``p1`` the non-raw partition guarantees the bad + # ordering [non-raw (p1), raw (p2)]. + # p1 — two overlapping writes on the same PK → non-raw_convertible. + _write([{'id': 1, 'dt': 'p1', 'value': 'p1-a'}]) + _write([{'id': 1, 'dt': 'p1', 'value': 'p1-b'}]) + # p2 — single write → raw_convertible. + _write([{'id': 2, 'dt': 'p2', 'value': 'p2-a'}]) + + all_splits = table.new_read_builder().new_scan().plan().splits() + raw_splits = [s for s in all_splits if s.raw_convertible] + non_raw_splits = [s for s in all_splits if not s.raw_convertible] + self.assertGreaterEqual( + len(raw_splits), 1, + "test fixture must produce at least one raw_convertible split" + " — the bug only triggers in a mixed plan", + ) + self.assertGreaterEqual( + len(non_raw_splits), 1, + "test fixture must produce at least one non-raw_convertible split" + " — without one the early-return branch in the pre-fix code" + " never drops anything", + ) + + # Limit small enough that a single raw split's row_count already + # meets the budget. Pre-fix: scanner returns just the raw split, + # silently drops every non-raw split. Post-fix: every split is + # carried through; only the raw rows count toward the budget. + limited_splits = table.new_read_builder().with_limit(1).new_scan().plan().splits() + + self.assertEqual( + len(limited_splits), len(all_splits), + "limit pushdown dropped {} non-raw_convertible split(s); " + "they must be retained because their row_count is the " + "pre-merge file count and cannot be used as the limit budget" + .format(len(all_splits) - len(limited_splits)), + ) + # Spot-check both buckets are still represented. + retained_raw = sum(1 for s in limited_splits if s.raw_convertible) + retained_non_raw = sum(1 for s in limited_splits if not s.raw_convertible) + self.assertGreaterEqual(retained_raw, 1) + self.assertGreaterEqual(retained_non_raw, 1) + + +class ApplyPushDownLimitUnitTest(unittest.TestCase): + """Direct, mock-driven coverage of ``FileScanner._apply_push_down_limit``. + + Pypaimon's writer doesn't compact L0 → L1+, and the DV-enabled + PK-table read path skips L0 files, so a true DV-aware + ``raw_convertible`` split (where ``merged_row_count < row_count``) + is hard to produce from a pure-Python end-to-end fixture. The + accumulator semantics, however, are a simple loop on the splits + list — exercise it directly with synthetic split stand-ins. + + These cases pin down the correctness contract without depending on + storage layout: the accumulator must use ``merged_row_count`` + (matching Java's ``partialMergedRowCount``) and must keep every + split it has visited up to and including the one that meets the + budget. + """ + + @staticmethod + def _apply(splits, limit): + from pypaimon.read.scanner.file_scanner import FileScanner + + # Stand in for ``self`` — only ``self.limit`` is read by the method. + class _FakeScanner: + pass + + scanner = _FakeScanner() + scanner.limit = limit + return FileScanner._apply_push_down_limit(scanner, splits) + + @staticmethod + def _split(raw_convertible, row_count, merged_row_count): + class _FakeSplit: + pass + + s = _FakeSplit() + s.raw_convertible = raw_convertible + s.row_count = row_count + s._merged = merged_row_count + + def _merged_fn(): + return s._merged + + s.merged_row_count = _merged_fn + return s + + def test_dv_aware_accumulator_uses_merged_row_count(self): + """[raw(row_count=10, dv→merged=4), non-raw, non-raw] + limit=5. + + Pre-fix accumulator (``+= row_count``): the raw split's pre-DV + count of 10 already meets ``limit=5``, the loop early-returns + with just ``[raw]`` and the two non-raw splits are dropped. The + reader can then only see 4 rows from the DV split — silently + less than ``limit``. + + Post-fix accumulator (``+= merged_row_count``): only 4 rows of + budget after the raw split, so 4 < 5; the loop keeps walking, + adds the two non-raw splits without changing the accumulator, + and falls through to ``return splits`` with all three. The + reader then has enough material across three splits to produce + ``limit`` rows. + """ + s_raw = self._split(raw_convertible=True, row_count=10, merged_row_count=4) + s_nr1 = self._split(raw_convertible=False, row_count=10, merged_row_count=None) + s_nr2 = self._split(raw_convertible=False, row_count=10, merged_row_count=None) + + result = self._apply([s_raw, s_nr1, s_nr2], limit=5) + self.assertEqual( + len(result), 3, + "merged_row_count accumulator must NOT early-return after a " + "DV-aware raw split whose post-DV count is below the limit; " + "got {}".format([id(s) for s in result]), + ) + + def test_accumulator_falls_back_to_row_count_when_merged_unavailable(self): + """``merged_row_count`` returns ``None`` for layouts where the + DV cardinality / data-evolution range isn't recorded yet. The + accumulator must fall back to ``row_count`` rather than treat + ``None`` as zero (which would never trigger the early return) + or skip the split entirely. With a single raw split whose + ``merged_row_count`` is unavailable but ``row_count`` already + meets the budget, the loop should early-return with just that + split.""" + s = self._split(raw_convertible=True, row_count=10, merged_row_count=None) + result = self._apply([s], limit=5) + self.assertEqual(len(result), 1) + self.assertIs(result[0], s) + + def test_no_raw_splits_falls_through_to_full_list(self): + """No raw splits → accumulator never moves → loop completes → + fallback returns the full list (matching Java's behaviour for + the all-non-raw case).""" + s1 = self._split(raw_convertible=False, row_count=10, merged_row_count=None) + s2 = self._split(raw_convertible=False, row_count=10, merged_row_count=None) + result = self._apply([s1, s2], limit=5) + self.assertEqual(result, [s1, s2]) + + def test_empty_splits_returns_empty(self): + self.assertEqual(self._apply([], limit=5), []) + + def test_no_limit_returns_input_unchanged(self): + s = self._split(raw_convertible=True, row_count=10, merged_row_count=10) + result = self._apply([s], limit=None) + self.assertEqual(result, [s]) + if __name__ == '__main__': unittest.main() From 3051ff69ae4c034991ee54f71e1c7ffddea57e78 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Thu, 30 Apr 2026 21:29:29 +0800 Subject: [PATCH 2/8] [python] Tighten _apply_push_down_limit to mirror Java line-for-line MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review feedback (XiaoHongbo-Hope on PR #7742): the previous fix accumulated ``merged_row_count() if not None else split.row_count`` under a ``raw_convertible`` gate while unconditionally adding every split to ``limited_splits``. That was a Python-only behavior and diverged from Java's ``DataTableBatchScan.applyPushDownLimit`` despite the PR description claiming "mirrors Java line-for-line". Three concrete divergences: 1. Gate: Java uses ``mergedRowCount.isPresent()``, we used ``raw_convertible``. 2. Append timing: Java only adds splits whose merged count is known; we added every split regardless. 3. Fallback: Java has none; we fell back to ``split.row_count`` when ``merged_row_count()`` returned None. The single behavioral fix this PR needs to deliver is the accumulator source — replacing ``split.row_count`` (DV-blind, over-counts when DV is on) with ``merged_row_count()`` (DV-aware). Java already does exactly this. Drop the extra divergences so the loop reads as a direct port of Java: for split in splits: merged = split.merged_row_count() if merged is not None: limited_splits.append(split) scanned_row_count += merged if scanned_row_count >= self.limit: return limited_splits return splits Test adjustments: - Removed the integration test ``test_limit_drops_non_raw_split_after_ raw_budget_is_met``. Its expectation ("non-raw split survives the limit pushdown") was based on the now-reverted unconditional-append behavior. Java drops non-raw splits after the budget is met — matching this is now correct, so the test is no longer a regression reproducer. - Renamed ``test_accumulator_falls_back_to_row_count_when_merged_ unavailable`` to ``test_accumulator_skips_splits_with_unknown_ merged_count`` and rewrote the docstring to describe the actual Java-aligned behavior (skip + fall through). - Kept ``test_dv_aware_accumulator_uses_merged_row_count`` as the master-vs-fix reproducer: master accumulates row_count=10 ≥ limit=5 and early-returns ``[raw]`` (1 split); fix accumulates merged=4 < 5, skips the two non-raw splits, falls through to ``return splits`` with all 3. Verified by swapping the file body to master's version — this test fails (1 != 3) on master and passes after the fix. Lint: flake8 clean. Tests: 10/10 in reader_split_generator_test.py. --- .../pypaimon/read/scanner/file_scanner.py | 50 +++---- .../tests/reader_split_generator_test.py | 130 ++---------------- 2 files changed, 37 insertions(+), 143 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index 1885c3856178..e38321d3c4e3 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -371,30 +371,34 @@ def _apply_push_down_limit(self, splits: List[DataSplit]) -> List[DataSplit]: scanned_row_count = 0 limited_splits = [] - # Keep every split. Only ``raw_convertible`` splits contribute to - # the row-count accumulator. Mirrors Java's - # DataTableBatchScan.applyPushDownLimit() — Java accumulates - # ``split.partialMergedRowCount()``, the file-level row count *minus* - # any deletion-vector cardinality already recorded in the manifest. - # Python has the same value via ``DataSplit.merged_row_count()``; - # the previous Python code accumulated ``split.row_count`` which is - # the pre-DV upper bound and over-counts when DV is on, causing the - # early return to fire before the reader can actually produce the - # budget. When the merged count is unavailable (older manifests / - # some data-evolution layouts where ``first_row_id`` is not - # populated) fall back to ``split.row_count``: the file-level upper - # bound is the previous behaviour and is still safer than skipping - # the split's contribution entirely. - # Non-raw_convertible splits cannot be cheaply counted ahead of - # read, so we keep them around so the reader still has data to - # drain. + # Line-for-line port of Java + # ``DataTableBatchScan.applyPushDownLimit`` (paimon-core/.../source/ + # DataTableBatchScan.java:128-165): + # + # for each split: + # if mergedRowCount.isPresent(): + # limitedSplits.add(split) + # scanned += mergedRowCount.getAsLong() + # if scanned >= limit: return limitedSplits + # return result // == original splits + # + # The previous Python code accumulated ``split.row_count`` (the + # pre-DV upper bound) and over-counted when DV was enabled, + # causing the early return to fire before the reader could + # actually produce ``limit`` rows. Java avoids that by using + # ``mergedRowCount`` (DV-aware); we use the same source via + # ``DataSplit.merged_row_count()``. + # + # Splits whose merged count is unknown (non-raw, or data-evolution + # layouts where ``first_row_id`` is missing) are skipped — they + # cannot meaningfully contribute to the budget. They still reach + # the reader via the fall-through ``return splits`` when the + # accumulator never reaches the limit, mirroring Java's behaviour. for split in splits: - limited_splits.append(split) - if split.raw_convertible: - merged = split.merged_row_count() - scanned_row_count += ( - merged if merged is not None else split.row_count - ) + merged = split.merged_row_count() + if merged is not None: + limited_splits.append(split) + scanned_row_count += merged if scanned_row_count >= self.limit: return limited_splits diff --git a/paimon-python/pypaimon/tests/reader_split_generator_test.py b/paimon-python/pypaimon/tests/reader_split_generator_test.py index 8587c1315e46..a912ae4822d6 100644 --- a/paimon-python/pypaimon/tests/reader_split_generator_test.py +++ b/paimon-python/pypaimon/tests/reader_split_generator_test.py @@ -329,118 +329,6 @@ def test_sliced_split_merged_row_count(self): # Note: SlicedSplit may or may not be created depending on data distribution # This test ensures that if SlicedSplit is created, merged_row_count() works correctly - def test_limit_drops_non_raw_split_after_raw_budget_is_met(self): - """Reproducer for the limit-pushdown bug. - - The pre-fix loop in ``FileScanner._apply_push_down_limit`` was:: - - for split in splits: - if split.raw_convertible: - limited_splits.append(split) - scanned_row_count += split.row_count - if scanned_row_count >= self.limit: - return limited_splits # <-- early return drops - # any subsequent non-raw splits - return splits - - So the bug only triggers when: - (a) at least one raw_convertible split's ``row_count`` reaches the - limit, AND - (b) at least one *more* non-raw_convertible split exists in the - same plan. - - If every split is non-raw the loop never enters the body and the - fallback ``return splits`` masks the bug; if every split is raw - and row_counts sum to the limit, dropping the rest is correct - anyway. So a proper regression test must construct a *mix* of - raw and non-raw splits in a single plan and assert that - ``len(limited_splits) >= len(raw_convertible_splits) + 1`` — - i.e. at least one non-raw split survives the limit pushdown. - - Build that mix using two partitions with a single bucket each: - * partition ``p1`` — one batch, one file → raw_convertible split - * partition ``p2`` — two overlapping batches on the same PK → - non-raw_convertible split (needs merge-on-read) - - With ``bucket=1`` per partition the layout is deterministic. - """ - pa_schema = pa.schema([ - pa.field('id', pa.int64(), nullable=False), - pa.field('dt', pa.string(), nullable=False), - ('value', pa.string()), - ]) - schema = Schema.from_pyarrow_schema( - pa_schema, - partition_keys=['dt'], - primary_keys=['id', 'dt'], - options={'bucket': '1', 'merge-engine': 'deduplicate'}, - ) - self.catalog.create_table( - 'default.test_limit_mix_raw_nonraw', schema, False) - table = self.catalog.get_table('default.test_limit_mix_raw_nonraw') - - def _write(rows): - wb = table.new_batch_write_builder() - w = wb.new_write() - c = wb.new_commit() - try: - w.write_arrow(pa.Table.from_pylist(rows, schema=pa_schema)) - c.commit(w.prepare_commit()) - finally: - w.close() - c.close() - - # Order matters. The buggy loop short-circuits as soon as one - # raw_convertible split's row_count meets the limit, so a - # non-raw split that appears AFTER that point is dropped no - # matter what (matching Java). The bug we want to catch is the - # one where a non-raw split appearing BEFORE the raw budget is - # met gets silently skipped — the iteration would never even - # add it to ``limited_splits``. - # ``PrimaryKeyTableSplitGenerator`` walks partitions in order, - # so making ``p1`` the non-raw partition guarantees the bad - # ordering [non-raw (p1), raw (p2)]. - # p1 — two overlapping writes on the same PK → non-raw_convertible. - _write([{'id': 1, 'dt': 'p1', 'value': 'p1-a'}]) - _write([{'id': 1, 'dt': 'p1', 'value': 'p1-b'}]) - # p2 — single write → raw_convertible. - _write([{'id': 2, 'dt': 'p2', 'value': 'p2-a'}]) - - all_splits = table.new_read_builder().new_scan().plan().splits() - raw_splits = [s for s in all_splits if s.raw_convertible] - non_raw_splits = [s for s in all_splits if not s.raw_convertible] - self.assertGreaterEqual( - len(raw_splits), 1, - "test fixture must produce at least one raw_convertible split" - " — the bug only triggers in a mixed plan", - ) - self.assertGreaterEqual( - len(non_raw_splits), 1, - "test fixture must produce at least one non-raw_convertible split" - " — without one the early-return branch in the pre-fix code" - " never drops anything", - ) - - # Limit small enough that a single raw split's row_count already - # meets the budget. Pre-fix: scanner returns just the raw split, - # silently drops every non-raw split. Post-fix: every split is - # carried through; only the raw rows count toward the budget. - limited_splits = table.new_read_builder().with_limit(1).new_scan().plan().splits() - - self.assertEqual( - len(limited_splits), len(all_splits), - "limit pushdown dropped {} non-raw_convertible split(s); " - "they must be retained because their row_count is the " - "pre-merge file count and cannot be used as the limit budget" - .format(len(all_splits) - len(limited_splits)), - ) - # Spot-check both buckets are still represented. - retained_raw = sum(1 for s in limited_splits if s.raw_convertible) - retained_non_raw = sum(1 for s in limited_splits if not s.raw_convertible) - self.assertGreaterEqual(retained_raw, 1) - self.assertGreaterEqual(retained_non_raw, 1) - - class ApplyPushDownLimitUnitTest(unittest.TestCase): """Direct, mock-driven coverage of ``FileScanner._apply_push_down_limit``. @@ -514,15 +402,17 @@ def test_dv_aware_accumulator_uses_merged_row_count(self): "got {}".format([id(s) for s in result]), ) - def test_accumulator_falls_back_to_row_count_when_merged_unavailable(self): + def test_accumulator_skips_splits_with_unknown_merged_count(self): """``merged_row_count`` returns ``None`` for layouts where the - DV cardinality / data-evolution range isn't recorded yet. The - accumulator must fall back to ``row_count`` rather than treat - ``None`` as zero (which would never trigger the early return) - or skip the split entirely. With a single raw split whose - ``merged_row_count`` is unavailable but ``row_count`` already - meets the budget, the loop should early-return with just that - split.""" + DV cardinality / data-evolution range isn't recorded yet (e.g. + a non-raw split, or a raw split missing DV cardinality in the + manifest). Such splits cannot meaningfully contribute to the + budget — Java's ``applyPushDownLimit`` skips them in the + accumulator loop and falls through to the full split list when + the loop completes without reaching the limit. We mirror that: + with a single split whose ``merged_row_count`` is unavailable, + the loop never accumulates anything and we return the input + unchanged.""" s = self._split(raw_convertible=True, row_count=10, merged_row_count=None) result = self._apply([s], limit=5) self.assertEqual(len(result), 1) From 7082f11755d0fc163271fd05bf5e2c9b9bc911ae Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Thu, 30 Apr 2026 21:30:01 +0800 Subject: [PATCH 3/8] [python] Fix flake8 E302 in reader_split_generator_test.py --- paimon-python/pypaimon/tests/reader_split_generator_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-python/pypaimon/tests/reader_split_generator_test.py b/paimon-python/pypaimon/tests/reader_split_generator_test.py index a912ae4822d6..0890f9f73d1a 100644 --- a/paimon-python/pypaimon/tests/reader_split_generator_test.py +++ b/paimon-python/pypaimon/tests/reader_split_generator_test.py @@ -329,6 +329,7 @@ def test_sliced_split_merged_row_count(self): # Note: SlicedSplit may or may not be created depending on data distribution # This test ensures that if SlicedSplit is created, merged_row_count() works correctly + class ApplyPushDownLimitUnitTest(unittest.TestCase): """Direct, mock-driven coverage of ``FileScanner._apply_push_down_limit``. From 68656b74d1e36c0306935ca86527273ce9bee798 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Thu, 30 Apr 2026 22:13:50 +0800 Subject: [PATCH 4/8] [python] Add hasNonPartitionFilter short-circuit in _apply_push_down_limit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Line-by-line audit against Java ``DataTableBatchScan.applyPushDownLimit`` (paimon-core/.../source/ DataTableBatchScan.java:128-165) caught one missing branch: Java L129: if (pushDownLimit == null || hasNonPartitionFilter()) return Optional.empty(); Java skips limit pushdown entirely when the predicate references any non-partition column, because per-split row counts (the accumulator input below) are pre-filter and would over-count against the actual filtered output — pushing the early-return budget too low and giving the reader fewer rows than the user asked for. Add the equivalent short-circuit to Python: a new private helper ``_has_non_partition_filter()`` mirrors Java's ``SnapshotReaderImpl.hasNonPartitionFilter`` (lines 235-248) using the existing ``_get_all_fields`` predicate-leaf walker. When the predicate references any column outside ``partition_keys`` the limit-pushdown loop is skipped and the splits are returned untouched. Tests: - New ``test_non_partition_filter_short_circuits_pushdown`` in ApplyPushDownLimitUnitTest covers the new branch. - Existing 4 unit tests carry through unchanged (the new short-circuit doesn't trip when ``has_non_partition_filter=False``). Inline comments now annotate every Java line we mirror (L129, L138, L146, L147-163, L164) so a reviewer can verify the port at a glance. --- .../pypaimon/read/scanner/file_scanner.py | 75 ++++++++++++------- .../tests/reader_split_generator_test.py | 22 +++++- 2 files changed, 69 insertions(+), 28 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index e38321d3c4e3..fd70f39ec1bc 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -31,7 +31,8 @@ from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions from pypaimon.read.plan import Plan -from pypaimon.read.push_down_utils import (remove_row_id_filter, +from pypaimon.read.push_down_utils import (_get_all_fields, + remove_row_id_filter, trim_and_transform_predicate) from pypaimon.read.scanner.append_table_split_generator import \ AppendTableSplitGenerator @@ -366,34 +367,37 @@ def with_global_index_result(self, result) -> 'FileScanner': return self def _apply_push_down_limit(self, splits: List[DataSplit]) -> List[DataSplit]: + # Line-for-line port of Java + # ``DataTableBatchScan.applyPushDownLimit`` + # (paimon-core/.../source/DataTableBatchScan.java:128-165). + + # Java L129-131: if (pushDownLimit == null || hasNonPartitionFilter()) + # return Optional.empty(); // skip pushdown + # Java skips limit pushdown entirely whenever the predicate touches + # any non-partition column, because per-split row counts (which the + # accumulator below sums) are pre-filter and would over-count + # against the actual filtered output. Mirror that here: when the + # predicate references a column outside ``partition_keys`` we + # bail out and return the splits untouched. if self.limit is None: return splits - scanned_row_count = 0 - limited_splits = [] + if self._has_non_partition_filter(): + return splits - # Line-for-line port of Java - # ``DataTableBatchScan.applyPushDownLimit`` (paimon-core/.../source/ - # DataTableBatchScan.java:128-165): - # - # for each split: - # if mergedRowCount.isPresent(): - # limitedSplits.add(split) - # scanned += mergedRowCount.getAsLong() - # if scanned >= limit: return limitedSplits - # return result // == original splits - # - # The previous Python code accumulated ``split.row_count`` (the - # pre-DV upper bound) and over-counted when DV was enabled, - # causing the early return to fire before the reader could - # actually produce ``limit`` rows. Java avoids that by using - # ``mergedRowCount`` (DV-aware); we use the same source via - # ``DataSplit.merged_row_count()``. - # - # Splits whose merged count is unknown (non-raw, or data-evolution - # layouts where ``first_row_id`` is missing) are skipped — they - # cannot meaningfully contribute to the budget. They still reach - # the reader via the fall-through ``return splits`` when the - # accumulator never reaches the limit, mirroring Java's behaviour. + # Java L138, L146: scannedRowCount=0; limitedSplits = new ArrayList<>(). + scanned_row_count = 0 + limited_splits: List[DataSplit] = [] + + # Java L147-163: iterate, only count splits whose mergedRowCount + # is present, accumulate that DV-aware count, early-return the + # collected sublist as soon as the budget is met. The previous + # Python code accumulated ``split.row_count`` (DV-blind upper + # bound) and over-counted when DV was on, causing the early + # return to fire before the reader could actually produce the + # limit rows; Java uses ``mergedRowCount`` for exactly this + # reason. Splits whose merged count is unknown (non-raw, or + # data-evolution layouts where ``first_row_id`` is missing) are + # skipped — same as Java's ``isPresent()`` gate. for split in splits: merged = split.merged_row_count() if merged is not None: @@ -402,8 +406,27 @@ def _apply_push_down_limit(self, splits: List[DataSplit]) -> List[DataSplit]: if scanned_row_count >= self.limit: return limited_splits + # Java L164: return Optional.of(result); // result wraps the + # original splits when the loop never reaches the budget. return splits + def _has_non_partition_filter(self) -> bool: + """Mirror Java ``SnapshotReaderImpl.hasNonPartitionFilter`` + (paimon-core/.../source/snapshot/SnapshotReaderImpl.java:235-248): + when the user-provided predicate references any column outside + the partition keys, the non-partition portion is non-empty and + the flag is true. The Java ``applyPushDownLimit`` short-circuits + on this flag so per-split row counts (pre-filter) never act as + the limit budget. + + Python equivalent: collect the predicate's referenced field set + and check whether it has any element outside ``partition_keys``. + """ + if self.predicate is None: + return False + partition_keys = set(self.table.partition_keys or []) + return not _get_all_fields(self.predicate).issubset(partition_keys) + def _filter_manifest_file(self, file: ManifestFileMeta) -> bool: if not self.partition_key_predicate: return True diff --git a/paimon-python/pypaimon/tests/reader_split_generator_test.py b/paimon-python/pypaimon/tests/reader_split_generator_test.py index 0890f9f73d1a..f00c011df1e0 100644 --- a/paimon-python/pypaimon/tests/reader_split_generator_test.py +++ b/paimon-python/pypaimon/tests/reader_split_generator_test.py @@ -348,15 +348,18 @@ class ApplyPushDownLimitUnitTest(unittest.TestCase): """ @staticmethod - def _apply(splits, limit): + def _apply(splits, limit, has_non_partition_filter=False): from pypaimon.read.scanner.file_scanner import FileScanner - # Stand in for ``self`` — only ``self.limit`` is read by the method. + # Stand in for ``self`` — only ``self.limit`` and the + # ``_has_non_partition_filter`` short-circuit are exercised by + # the method under test. class _FakeScanner: pass scanner = _FakeScanner() scanner.limit = limit + scanner._has_non_partition_filter = lambda: has_non_partition_filter return FileScanner._apply_push_down_limit(scanner, splits) @staticmethod @@ -436,6 +439,21 @@ def test_no_limit_returns_input_unchanged(self): result = self._apply([s], limit=None) self.assertEqual(result, [s]) + def test_non_partition_filter_short_circuits_pushdown(self): + """Java ``applyPushDownLimit`` (DataTableBatchScan.java:129) bails + out when ``hasNonPartitionFilter()`` is true: per-split row counts + are pre-filter, so summing them against ``limit`` would + over-count when the predicate further filters rows. The Python + port must do the same — return the splits untouched and let the + reader apply both filter and limit.""" + s_raw = self._split(raw_convertible=True, row_count=10, merged_row_count=10) + # With a non-partition filter, the early-return branch trips + # before we even look at merged_row_count, so the budget never + # narrows the plan. + result = self._apply( + [s_raw, s_raw, s_raw], limit=5, has_non_partition_filter=True) + self.assertEqual(len(result), 3) + if __name__ == '__main__': unittest.main() From 3878ceac4cba3078808256f1cdd2a974c22fe545 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Thu, 30 Apr 2026 22:22:40 +0800 Subject: [PATCH 5/8] [python] Trim per-line Java-line annotations to a single docstring reference --- .../pypaimon/read/scanner/file_scanner.py | 53 ++++++------------- 1 file changed, 16 insertions(+), 37 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index fd70f39ec1bc..4aa28ece0bb8 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -367,37 +367,26 @@ def with_global_index_result(self, result) -> 'FileScanner': return self def _apply_push_down_limit(self, splits: List[DataSplit]) -> List[DataSplit]: - # Line-for-line port of Java - # ``DataTableBatchScan.applyPushDownLimit`` - # (paimon-core/.../source/DataTableBatchScan.java:128-165). - - # Java L129-131: if (pushDownLimit == null || hasNonPartitionFilter()) - # return Optional.empty(); // skip pushdown - # Java skips limit pushdown entirely whenever the predicate touches - # any non-partition column, because per-split row counts (which the - # accumulator below sums) are pre-filter and would over-count - # against the actual filtered output. Mirror that here: when the - # predicate references a column outside ``partition_keys`` we - # bail out and return the splits untouched. + """Mirror Java ``DataTableBatchScan.applyPushDownLimit``. + + Bail out when there's no limit or the predicate touches any + non-partition column (per-split row counts are pre-filter, so + summing them against a filtered-row budget would early-return + the loop too soon). Otherwise accumulate ``merged_row_count`` + — the DV-aware count, equivalent to Java's + ``Split.mergedRowCount`` — and return the collected sublist + once the budget is met. Splits whose merged count is unknown + (non-raw, or data-evolution layouts missing ``first_row_id``) + are skipped from the accumulator and reach the reader via the + fall-through return. + """ if self.limit is None: return splits if self._has_non_partition_filter(): return splits - # Java L138, L146: scannedRowCount=0; limitedSplits = new ArrayList<>(). scanned_row_count = 0 limited_splits: List[DataSplit] = [] - - # Java L147-163: iterate, only count splits whose mergedRowCount - # is present, accumulate that DV-aware count, early-return the - # collected sublist as soon as the budget is met. The previous - # Python code accumulated ``split.row_count`` (DV-blind upper - # bound) and over-counted when DV was on, causing the early - # return to fire before the reader could actually produce the - # limit rows; Java uses ``mergedRowCount`` for exactly this - # reason. Splits whose merged count is unknown (non-raw, or - # data-evolution layouts where ``first_row_id`` is missing) are - # skipped — same as Java's ``isPresent()`` gate. for split in splits: merged = split.merged_row_count() if merged is not None: @@ -405,22 +394,12 @@ def _apply_push_down_limit(self, splits: List[DataSplit]) -> List[DataSplit]: scanned_row_count += merged if scanned_row_count >= self.limit: return limited_splits - - # Java L164: return Optional.of(result); // result wraps the - # original splits when the loop never reaches the budget. return splits def _has_non_partition_filter(self) -> bool: - """Mirror Java ``SnapshotReaderImpl.hasNonPartitionFilter`` - (paimon-core/.../source/snapshot/SnapshotReaderImpl.java:235-248): - when the user-provided predicate references any column outside - the partition keys, the non-partition portion is non-empty and - the flag is true. The Java ``applyPushDownLimit`` short-circuits - on this flag so per-split row counts (pre-filter) never act as - the limit budget. - - Python equivalent: collect the predicate's referenced field set - and check whether it has any element outside ``partition_keys``. + """Mirror Java ``SnapshotReaderImpl.hasNonPartitionFilter``: + true when the predicate references any column outside the + partition keys (i.e. its non-partition portion is non-empty). """ if self.predicate is None: return False From ba5df58f4ce24e692b783b44bf0c9441420c6bb9 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 1 May 2026 17:01:25 +0800 Subject: [PATCH 6/8] [python] FileScanner: trim verbose docstrings on push-down-limit helpers Address review on PR #7742: shrink the multi-paragraph rationale on ``_apply_push_down_limit`` / ``_has_non_partition_filter`` to a single line each pointing at the Java counterpart. The full reasoning lives in the PR description; the file just needs to say what it mirrors. --- .../pypaimon/read/scanner/file_scanner.py | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index 4aa28ece0bb8..4221ad576dc8 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -367,18 +367,10 @@ def with_global_index_result(self, result) -> 'FileScanner': return self def _apply_push_down_limit(self, splits: List[DataSplit]) -> List[DataSplit]: - """Mirror Java ``DataTableBatchScan.applyPushDownLimit``. - - Bail out when there's no limit or the predicate touches any - non-partition column (per-split row counts are pre-filter, so - summing them against a filtered-row budget would early-return - the loop too soon). Otherwise accumulate ``merged_row_count`` - — the DV-aware count, equivalent to Java's - ``Split.mergedRowCount`` — and return the collected sublist - once the budget is met. Splits whose merged count is unknown - (non-raw, or data-evolution layouts missing ``first_row_id``) - are skipped from the accumulator and reach the reader via the - fall-through return. + """Mirror Java ``DataTableBatchScan.applyPushDownLimit``: sum the + DV-aware ``merged_row_count`` (== Java ``partialMergedRowCount``) + until the limit is met. Splits with unknown merged count fall + through to the reader unchanged. """ if self.limit is None: return splits @@ -397,10 +389,7 @@ def _apply_push_down_limit(self, splits: List[DataSplit]) -> List[DataSplit]: return splits def _has_non_partition_filter(self) -> bool: - """Mirror Java ``SnapshotReaderImpl.hasNonPartitionFilter``: - true when the predicate references any column outside the - partition keys (i.e. its non-partition portion is non-empty). - """ + """Mirror Java ``SnapshotReaderImpl.hasNonPartitionFilter``.""" if self.predicate is None: return False partition_keys = set(self.table.partition_keys or []) From e7a8f3bb29dc0747659080aa0858db489a7eb353 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 1 May 2026 22:35:57 +0800 Subject: [PATCH 7/8] [python] FileScanner: trim ApplyPushDownLimitUnitTest docstrings, fix description drift MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review comment r3173561771: tighten the new unit-test docstrings and correct the parts that no longer match the implementation. * Class-level rationale dropped — the cases speak for themselves. * test_dv_aware_accumulator_uses_merged_row_count: previous wording said the post-fix loop "adds the two non-raw splits without changing the accumulator". That's wrong: ``merged is None`` splits are NOT appended to ``limited_splits``; the three-split result comes from the fall-through ``return splits`` after the loop completes. Updated to say so. * Other docstrings shrunk to one or two lines each. * _apply / _split helpers: dropped the inline narration on the fake scanner / fake split — they're trivially obvious from the bodies. --- .../tests/reader_split_generator_test.py | 82 ++++--------------- 1 file changed, 14 insertions(+), 68 deletions(-) diff --git a/paimon-python/pypaimon/tests/reader_split_generator_test.py b/paimon-python/pypaimon/tests/reader_split_generator_test.py index f00c011df1e0..00b363fd3337 100644 --- a/paimon-python/pypaimon/tests/reader_split_generator_test.py +++ b/paimon-python/pypaimon/tests/reader_split_generator_test.py @@ -331,29 +331,12 @@ def test_sliced_split_merged_row_count(self): class ApplyPushDownLimitUnitTest(unittest.TestCase): - """Direct, mock-driven coverage of ``FileScanner._apply_push_down_limit``. - - Pypaimon's writer doesn't compact L0 → L1+, and the DV-enabled - PK-table read path skips L0 files, so a true DV-aware - ``raw_convertible`` split (where ``merged_row_count < row_count``) - is hard to produce from a pure-Python end-to-end fixture. The - accumulator semantics, however, are a simple loop on the splits - list — exercise it directly with synthetic split stand-ins. - - These cases pin down the correctness contract without depending on - storage layout: the accumulator must use ``merged_row_count`` - (matching Java's ``partialMergedRowCount``) and must keep every - split it has visited up to and including the one that meets the - budget. - """ + """Mock-driven coverage of ``FileScanner._apply_push_down_limit``.""" @staticmethod def _apply(splits, limit, has_non_partition_filter=False): from pypaimon.read.scanner.file_scanner import FileScanner - # Stand in for ``self`` — only ``self.limit`` and the - # ``_has_non_partition_filter`` short-circuit are exercised by - # the method under test. class _FakeScanner: pass @@ -371,61 +354,31 @@ class _FakeSplit: s.raw_convertible = raw_convertible s.row_count = row_count s._merged = merged_row_count - - def _merged_fn(): - return s._merged - - s.merged_row_count = _merged_fn + s.merged_row_count = lambda: s._merged return s def test_dv_aware_accumulator_uses_merged_row_count(self): - """[raw(row_count=10, dv→merged=4), non-raw, non-raw] + limit=5. - - Pre-fix accumulator (``+= row_count``): the raw split's pre-DV - count of 10 already meets ``limit=5``, the loop early-returns - with just ``[raw]`` and the two non-raw splits are dropped. The - reader can then only see 4 rows from the DV split — silently - less than ``limit``. - - Post-fix accumulator (``+= merged_row_count``): only 4 rows of - budget after the raw split, so 4 < 5; the loop keeps walking, - adds the two non-raw splits without changing the accumulator, - and falls through to ``return splits`` with all three. The - reader then has enough material across three splits to produce - ``limit`` rows. - """ + """DV-aware raw split + trailing non-raw splits, ``limit > merged``: + pre-fix (``+= row_count``) early-returns ``[raw]``; post-fix + (``+= merged_row_count``) leaves the budget at 4 < 5, the loop + completes, and the fall-through returns all three splits.""" s_raw = self._split(raw_convertible=True, row_count=10, merged_row_count=4) s_nr1 = self._split(raw_convertible=False, row_count=10, merged_row_count=None) s_nr2 = self._split(raw_convertible=False, row_count=10, merged_row_count=None) result = self._apply([s_raw, s_nr1, s_nr2], limit=5) - self.assertEqual( - len(result), 3, - "merged_row_count accumulator must NOT early-return after a " - "DV-aware raw split whose post-DV count is below the limit; " - "got {}".format([id(s) for s in result]), - ) + self.assertEqual(len(result), 3) def test_accumulator_skips_splits_with_unknown_merged_count(self): - """``merged_row_count`` returns ``None`` for layouts where the - DV cardinality / data-evolution range isn't recorded yet (e.g. - a non-raw split, or a raw split missing DV cardinality in the - manifest). Such splits cannot meaningfully contribute to the - budget — Java's ``applyPushDownLimit`` skips them in the - accumulator loop and falls through to the full split list when - the loop completes without reaching the limit. We mirror that: - with a single split whose ``merged_row_count`` is unavailable, - the loop never accumulates anything and we return the input - unchanged.""" + """A split whose ``merged_row_count()`` returns ``None`` does not + contribute to the budget; the loop completes and returns the + input via the fall-through.""" s = self._split(raw_convertible=True, row_count=10, merged_row_count=None) result = self._apply([s], limit=5) - self.assertEqual(len(result), 1) - self.assertIs(result[0], s) + self.assertEqual(result, [s]) def test_no_raw_splits_falls_through_to_full_list(self): - """No raw splits → accumulator never moves → loop completes → - fallback returns the full list (matching Java's behaviour for - the all-non-raw case).""" + """No split contributes to the budget → fall-through returns all.""" s1 = self._split(raw_convertible=False, row_count=10, merged_row_count=None) s2 = self._split(raw_convertible=False, row_count=10, merged_row_count=None) result = self._apply([s1, s2], limit=5) @@ -440,16 +393,9 @@ def test_no_limit_returns_input_unchanged(self): self.assertEqual(result, [s]) def test_non_partition_filter_short_circuits_pushdown(self): - """Java ``applyPushDownLimit`` (DataTableBatchScan.java:129) bails - out when ``hasNonPartitionFilter()`` is true: per-split row counts - are pre-filter, so summing them against ``limit`` would - over-count when the predicate further filters rows. The Python - port must do the same — return the splits untouched and let the - reader apply both filter and limit.""" + """Predicate touching a non-partition column → no pushdown, + regardless of how many DV-aware splits the plan contains.""" s_raw = self._split(raw_convertible=True, row_count=10, merged_row_count=10) - # With a non-partition filter, the early-return branch trips - # before we even look at merged_row_count, so the budget never - # narrows the plan. result = self._apply( [s_raw, s_raw, s_raw], limit=5, has_non_partition_filter=True) self.assertEqual(len(result), 3) From 91dc69f304aa585041b84ed41bac2800b4098220 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Sat, 2 May 2026 18:50:43 +0800 Subject: [PATCH 8/8] [python] FileScanner: align docstring with Java Split.mergedRowCount Reviewer pointed out the docstring named the Java method ``partialMergedRowCount`` while the actual API is ``Split.mergedRowCount()`` (DataTableBatchScan.applyPushDownLimit calls ``split.mergedRowCount()``). Pick the real name so future readers cross-referencing Java don't get tripped up. --- paimon-python/pypaimon/read/scanner/file_scanner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index 4221ad576dc8..92ef9f88389c 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -368,7 +368,7 @@ def with_global_index_result(self, result) -> 'FileScanner': def _apply_push_down_limit(self, splits: List[DataSplit]) -> List[DataSplit]: """Mirror Java ``DataTableBatchScan.applyPushDownLimit``: sum the - DV-aware ``merged_row_count`` (== Java ``partialMergedRowCount``) + DV-aware ``merged_row_count`` (== Java ``Split.mergedRowCount()``) until the limit is met. Splits with unknown merged count fall through to the reader unchanged. """