[feat](storage) Implement adaptive batch size for SegmentIterator#61535
[feat](storage) Implement adaptive batch size for SegmentIterator#61535mrhhsg wants to merge 1 commit intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
|
/review |
There was a problem hiding this comment.
Code Review Summary
This PR introduces an EWMA-based AdaptiveBlockSizePredictor that dynamically adjusts SegmentIterator chunk row counts to target a preferred block byte size. The design is reasonable and the overall approach is sound. However, there are several issues that need to be addressed.
Critical Checkpoints
Goal and correctness: The feature aims to dynamically adjust batch sizes to target byte-based block sizes instead of fixed row counts. The implementation adds prediction at the SegmentIterator level and byte-budget stops at BlockReader and VCollectIterator. There are correctness bugs in the record_rowids path (see inline comments).
Concurrency: The predictor is single-threaded per instance and used within a single SegmentIterator — no concurrency concern.
Lifecycle / SIOF: No static initialization order issues. The predictor is created as a unique_ptr member.
Config items: enable_adaptive_batch_size is dynamic (mutable), which is appropriate. Session variables preferred_block_size_bytes and preferred_max_column_in_block_size_bytes are correctly propagated via Thrift.
Incompatible changes: New Thrift field IDs (210, 211) have defaults matching the session variable defaults. Backward compatible.
Parallel code paths: The byte-budget check is added to _replace_key_next_block, _unique_key_next_block, _agg_key_next_block, and VCollectIterator::_merge_next. The _direct_next_block path (DUP KEY) relies on the SegmentIterator-level predictor limiting block_row_max, which is correct by design.
Test coverage: Regression tests cover DUP, AGG, and UNIQUE key table types. Unit tests are thorough for the predictor class itself. However, no test covers the record_rowids path which has bugs.
Observability: Profile counters AdaptiveBatchPredictMinRows and AdaptiveBatchPredictMaxRows are added. However, a LOG(INFO) debug log is left in hot path (see inline).
Performance: Block::columns_byte_size() is called per-row in some paths. Each call is O(num_columns) with O(1) per column. Acceptable for typical column counts but could be a concern for very wide tables (hundreds of columns).
Issues Found
-
[Bug]
_replace_key_next_blockand_unique_key_next_block: Missing_block_row_locations.resize(target_block_row)when byte-budget triggers break withrecord_rowidsenabled. This will cause the DCHECK at line 604 to fail and potentially return stale/incorrect row locations. -
[Bug/Severity:High]
LOG(INFO)with******prefix on everypredict_next_rows()call — this is a debug log accidentally left in production code. It will spam logs on every batch of every segment scan. -
[Minor] The
_replace_key_next_blockfunction does not callblock->set_columns()after mutating columns — this is pre-existing behavior but worth noting for consistency. -
[Observation] In
VCollectIterator::_merge_next, the byte-budget check usesBlock::columns_byte_size(target_columns)before flushingcontinuous_row_in_blockpending rows, so the actual output block can exceed the budget. This is a design choice (conservative), not a correctness bug.
| static_cast<size_t>(static_cast<double>(_block_size_bytes) / estimated_bytes_per_row); | ||
| LOG(INFO) << "****** predicted rows: " << predicted | ||
| << ", estimated_bytes_per_row: " << estimated_bytes_per_row | ||
| << ", _block_size_bytes: " << _block_size_bytes; |
There was a problem hiding this comment.
[Bug/High] This LOG(INFO) with ****** prefix is called on every predict_next_rows() invocation — i.e., every batch in every segment scan. This will generate massive log spam in production.
This should be changed to VLOG_DEBUG or removed entirely. At minimum, this is clearly a debug statement accidentally left in:
VLOG_DEBUG << "predicted rows: " << predicted
<< ", estimated_bytes_per_row: " << estimated_bytes_per_row
<< ", _block_size_bytes: " << _block_size_bytes;| Block::columns_byte_size(target_columns) >= | ||
| _reader_context.preferred_block_size_bytes) { | ||
| break; | ||
| } |
There was a problem hiding this comment.
[Bug] When record_rowids is enabled, this byte-budget break exits the outer while loop without resizing _block_row_locations to target_block_row. At line 349, _block_row_locations was resized to batch_size(), so after this break it will still have batch_size() entries instead of target_block_row entries.
The EOF path at lines 372-374 correctly does _block_row_locations.resize(target_block_row). This break path needs the same treatment:
if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 &&
Block::columns_byte_size(target_columns) >=
_reader_context.preferred_block_size_bytes) {
if (UNLIKELY(_reader_context.record_rowids)) {
_block_row_locations.resize(target_block_row);
}
break;
}Without this fix, current_block_row_locations() returns a vector with stale entries, and the DCHECK at line 604 (DCHECK_EQ(_block_row_locations.size(), block->rows() + delete_count)) will fail.
| Block::columns_byte_size(target_columns) >= | ||
| _reader_context.preferred_block_size_bytes) { | ||
| break; | ||
| } |
There was a problem hiding this comment.
[Bug] Same issue as in _replace_key_next_block: when record_rowids is enabled, this byte-budget break exits without resizing _block_row_locations to target_block_row. At line 530, _block_row_locations was resized to _reader_context.batch_size. The EOF path at lines 548-550 correctly resizes.
Fix:
if (config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 &&
Block::columns_byte_size(target_columns) >=
_reader_context.preferred_block_size_bytes) {
if (UNLIKELY(_reader_context.record_rowids)) {
_block_row_locations.resize(target_block_row);
}
break;
}Note: in _unique_key_next_block, there is also the _delete_sign_available filtering path (starting at line 566) that uses target_block_row and _block_row_locations. An incorrectly-sized _block_row_locations could corrupt the filter logic.
TPC-H: Total hot run time: 30963 ms |
TPC-DS: Total hot run time: 173281 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
daebf3f to
f404b06
Compare
|
run buildall |
|
/review |
There was a problem hiding this comment.
Code Review Summary: feat Implement adaptive batch size for SegmentIterator
This PR introduces an EWMA-based AdaptiveBlockSizePredictor that dynamically adjusts chunk row counts so each output Block stays near a configurable byte target. The overall design is sound, with proper plumbing from FE session variables through Thrift to the storage layer. The byte-budget checks in BlockReader and VCollectIterator are placed at safe boundaries.
However, I found several issues ranging from a unit test correctness bug to regression test standard violations.
Critical Checkpoints
1. Goal & Correctness: The PR achieves its stated goal of adaptive batch sizing. The EWMA predictor logic is correct. Byte-budget stops in _replace_key_next_block, _unique_key_next_block, _agg_key_next_block, and _merge_next are all placed at safe row boundaries. No data correctness issues found in the core logic.
2. Focused & Minimal: The change is reasonably focused. Adding new session variables, config flags, thrift fields, the predictor, and byte-budget stops across multiple layers is inherent to the feature.
3. Concurrency: The TOCTOU race in _collect_profile_before_close for min/max counter updates is benign (display-only counters) but should be documented or fixed. See inline comment.
4. Lifecycle: No lifecycle issues. The predictor is owned by SegmentIterator and destroyed with it.
5. Configuration: enable_adaptive_batch_size is mutable and appropriately documented. Session variables preferred_block_size_bytes and preferred_max_column_in_block_size_bytes are properly forwarded.
6. Incompatible changes: Thrift fields 210/211 are optional with correct IDs. No compatibility issues. (Note: PR description incorrectly states fields 204/205.)
7. Parallel code paths: The three BlockReader paths (_replace_key, _agg_key, _unique_key) are all covered. VCollectIterator merge path is covered. Compaction paths are not affected (they don't set preferred_block_size_bytes).
8. Test coverage: Unit tests are comprehensive (19+ test cases) but have a critical mock bug that makes metadata-hint tests read indeterminate memory. Regression tests have standard violations. See inline comments.
9. Observability: Profile counters AdaptiveBatchPredictMinRows/MaxRows are added. The else branch stats overwrite is problematic. See inline comment.
10. Performance: Block::columns_byte_size() is O(num_columns) per call (all byte_size() implementations are O(1)). Called per-row in _unique_key_next_block and _merge_next, this adds measurable but acceptable overhead for wide tables.
Issues Found
| # | Severity | File | Issue |
|---|---|---|---|
| 1 | High | Unit test | MockSegment::num_rows() mock is broken — Segment::num_rows() is non-virtual, so ON_CALL has no effect through Segment& reference. Tests using metadata hints read indeterminate _num_rows. |
| 2 | Medium | Regression test | Uses qt_ prefix instead of order_qt_ (violates regression test standards) |
| 3 | Medium | Regression test | Drops 4 of 5 tables after use (should only drop before, to preserve debug state) |
| 4 | Low | olap_scanner.cpp | TOCTOU race on min/max counter updates (benign for display-only counters) |
| 5 | Low | segment_iterator.cpp | else branch overwrites stats every call when predictor is inactive, producing misleading values |
| 6 | Info | PR description | States thrift fields 204/205 but actual code uses 210/211 |
|
|
||
| // Set up num_rows mock. | ||
| ON_CALL(*seg, num_rows()).WillByDefault(Return(num_rows)); | ||
|
|
There was a problem hiding this comment.
Bug (High): Segment::num_rows() is non-virtual (declared as uint32_t num_rows() const { return _num_rows; } in segment.h:119). The ON_CALL(*seg, num_rows()).WillByDefault(Return(num_rows)) configures gmock's virtual mock method, but when predict_next_rows() calls segment.num_rows() through a Segment& reference, the compiler statically dispatches to the base class's non-virtual method, bypassing the mock entirely.
The actual value returned will be whatever _num_rows was left at by the base Segment constructor — which does not initialize _num_rows in its member initializer list (see segment.cpp:168-174). This means tests like PredictNoHistoryMetadataHint are reading indeterminate memory (undefined behavior).
Fix: Since MockSegment is declared as a friend of Segment, you can directly set the member:
seg->_num_rows = num_rows;instead of (or in addition to) the ON_CALL mock.
|
|
||
| if (stats.adaptive_batch_size_predict_max_rows > 0) { | ||
| auto cur_min = local_state->_adaptive_batch_predict_min_rows_counter->value(); | ||
| if (cur_min == 0 || stats.adaptive_batch_size_predict_min_rows < cur_min) { |
There was a problem hiding this comment.
Minor (Low): This read-check-store pattern on shared RuntimeProfile::Counter is a classic TOCTOU race. Multiple scanners call _collect_profile_before_close concurrently on the same local_state counters. Thread A can read a stale cur_min, pass the check, and overwrite thread B's correct value.
Since these are display-only profile counters, this is benign — the worst outcome is a slightly inaccurate min/max in the query profile. But consider documenting this or using COUNTER_UPDATE with atomic min/max if RuntimeProfile::Counter supports it.
| def res_enabled = sql "select id, length(c1) as l1, length(c2) as l2, length(c3) as l3 from abs_wide_table order by 1, 2, 3, 4" | ||
|
|
||
| qt_wide "select id, length(c1) as l1, length(c2) as l2, length(c3) as l3 from abs_wide_table order by 1, 2, 3, 4 limit 50" | ||
|
|
There was a problem hiding this comment.
Standards violation (Medium): Per regression test standards, use order_qt_ prefix instead of qt_ to ensure deterministic ordered output. This applies to all query tags in this file (qt_wide, qt_narrow, qt_agg, qt_unique, qt_flag).
While some of these queries have explicit ORDER BY or return single rows, the standard convention is to consistently use order_qt_ prefix.
| // toward returning close to max_rows (batch is still row-limited). | ||
|
|
||
| sql "drop table if exists abs_narrow_table" | ||
| sql """ |
There was a problem hiding this comment.
Standards violation (Medium): Per regression test standards: "After completing tests, do not drop tables; instead drop tables before using them in tests, to preserve the environment for debugging."
This drop table after use (and similar ones for abs_agg_table, abs_unique_table, abs_flag_table) should be removed. The drop table if exists before CREATE TABLE at the beginning of each test case is the correct pattern (and is already present).
| static_cast<int64_t>(predicted)); | ||
| } else { | ||
| _opts.stats->adaptive_batch_size_predict_min_rows = _opts.block_row_max; | ||
| _opts.stats->adaptive_batch_size_predict_max_rows = _opts.block_row_max; |
There was a problem hiding this comment.
Minor (Low): When _block_size_predictor is null (feature disabled), this else branch unconditionally overwrites adaptive_batch_size_predict_min_rows and adaptive_batch_size_predict_max_rows with _opts.block_row_max on every next_batch() call. Since OlapReaderStatistics is shared across segment iterators for the same scanner, and adaptive_batch_size_predict_min_rows is initialized to INT64_MAX, the first segment iterator's overwrite clobbers the sentinel.
This means the profile counters will show misleading values when the feature is disabled. Consider only setting these once, or guarding with a check (e.g., only set if currently INT64_MAX / 0).
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
TPC-H: Total hot run time: 27323 ms |
TPC-DS: Total hot run time: 169237 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
Introduce an EWMA-based AdaptiveBlockSizePredictor that dynamically adjusts SegmentIterator chunk row counts so each output Block stays near the session-variable-configured preferred_block_size_bytes target. A complementary byte-budget stop condition is added to all BlockReader and VCollectIterator accumulation loops so the final Block returned to the upper layer is also bounded. Key changes: - New BE config: enable_adaptive_batch_size (default: true) - New session variables: preferred_block_size_bytes (8 MB), preferred_max_column_in_block_size_bytes (1 MB) - New Thrift fields 204/205 for FE→BE propagation - AdaptiveBlockSizePredictor: EWMA per-row and per-column byte estimator with conservative segment-metadata bootstrap - SegmentIterator: predicts rows before each next_batch call; updates EWMA on success path - BlockReader: byte-stop in _replace_key_next_block, _unique_key_next_block, _agg_key_next_block - VCollectIterator: byte-stop in Level1Iterator::_merge_next - Unit tests and regression tests included Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
f404b06 to
ae43fe9
Compare
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
TPC-H: Total hot run time: 26528 ms |
TPC-DS: Total hot run time: 169678 ms |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
Introduce an EWMA-based AdaptiveBlockSizePredictor that dynamically adjusts SegmentIterator chunk row counts so each output Block stays near the session-variable-configured preferred_block_size_bytes target. A complementary byte-budget stop condition is added to all BlockReader and VCollectIterator accumulation loops so the final Block returned to the upper layer is also bounded.
Key changes:
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)