Skip to content

feat(merge): streaming column-major Parquet merge engine (10 inner PRs)#6459

Open
g-talbot wants to merge 10 commits into
mainfrom
gtt/parquet-streaming-base
Open

feat(merge): streaming column-major Parquet merge engine (10 inner PRs)#6459
g-talbot wants to merge 10 commits into
mainfrom
gtt/parquet-streaming-base

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

Summary

Lands the streaming column-major Parquet merge engine and surrounding plumbing on main. This rolls up ten inner PRs that previously targeted the long-lived feature branch gtt/parquet-streaming-base. Each commit in this PR is the squash of one of those inner PRs and references its number so the per-PR history remains discoverable.

The new engine is default off: IndexerConfig::parquet_merge_use_streaming_engine defaults to false, and config/quickwit.yaml does not set the flag. The in-memory engine (merge_sorted_parquet_files) is the runtime fallback for regular merges. Promotion merges (those with target_prefix_len_override) always run through the streaming engine because the in-memory path can't handle mixed prefix lengths.

Inner PRs (oldest → newest)

# Commit subject
#6406 ColumnPageStream trait — single contract for streaming + legacy inputs
#6408 legacy multi-RG input adapter for ColumnPageStream
#6407 page-bounded Arrow decoder per data page
#6409 streaming column-major merge engine with page-bounded body cols
#6424 per-region engine + multi-output sorted_series splitting
#6425 prefix-aware output with caller-supplied target_prefix_len
#6426 adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers
#6423 legacy promotion path + body-col schema evolution
#6428 adversarial-review test coverage (F4/F5/F7) + F14 sub-region engine fix
#6441 YAML flag to route regular merges through streaming engine (default off, no-op rollout)

Each inner PR went through review on gtt/parquet-streaming-base before being merged there. This PR is the integration step; the individual reviews are linked from each commit.

Rollout

  • New code path is opt-in via indexer.parquet_merge_use_streaming_engine: true in quickwit.yaml.
  • Promotion merges (mixed rg_partition_prefix_len inputs) always use the streaming engine — that's the path's reason to exist; the in-memory engine bails before producing output.
  • Both paths share the same input/output schema and same metadata aggregator, so production can flip the flag back at runtime without redeploy.

Test plan

  • cargo fmt --all -- --check
  • cargo clippy --workspace --tests --all-features (no errors; warnings unchanged from main)
  • cargo doc --no-deps
  • cargo machete
  • license headers + log format checks
  • full cargo nextest run --all-features in CI

🤖 Generated with Claude Code

g-talbot and others added 4 commits May 20, 2026 09:40
… inputs (#6406)

Extracts a `pub trait ColumnPageStream` from PR-4's concrete reader so PR-5
(legacy adapter) and PR-6 (streaming merge engine) can land in parallel
against a stable contract instead of one rewriting the other.

Trait shape:
  fn metadata(&self) -> &Arc<ParquetMetaData>
  async fn next_page(&mut self) -> Result<Option<Page>, ParquetReadError>

Same invariants PR-4 already guarantees: row-group-major /
column-major-within-RG / page-major-within-column yield order, idempotent
EOF, and I/O failures surface as ParquetReadError::Io rather than being
masked as decode errors.

Implements the trait for `StreamingParquetReader`. Promotes visibility
of `Page`, `ParquetReadError`, `RemoteByteSource`, `StreamingParquetReader`,
and `StreamingReaderConfig` from `pub(crate)` to `pub` so downstream
crates and PR-5/PR-6 can consume them.

Adds `test_streaming_reader_satisfies_column_page_stream_trait`: drains
the same fixture through both the concrete-typed and trait-object surfaces,
asserts identical (rg_idx, col_idx, page_idx_in_col, compressed_page_size)
sequences and idempotent EOF through the trait.

Pure refactor — no behavior change.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* feat: legacy multi-RG input adapter for ColumnPageStream

Buffers a legacy parquet file fully, decodes via Arrow, concatenates
into a single RecordBatch preserving order, and re-encodes as a
single-row-group parquet stream that StreamingParquetReader can serve
through the ColumnPageStream contract. Carries forward the original
file's key_value_metadata and sorting_columns so downstream consumers
(merge engine, metadata readers) see an identical logical view. This
unblocks the merge engine's column-major streaming path on files
where the original RG layout is misaligned with the sort prefix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test: byte-equal data roundtrip through reencode helper

`test_data_roundtrip_through_adapter` checks row count and schema
names through the streaming path; that catches dropped rows but not
value-level corruption (e.g. a hypothetical dictionary key XOR or
column-value swap during the decode/concat/re-encode chain). Adds
`test_reencode_preserves_arrays_byte_equal` calling
`reencode_as_single_row_group` directly against a multi-RG fixture
that includes dict-encoded columns and nulls, and asserts each
column equals the oracle byte-for-byte.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* feat: page-stream → RecordBatch decoder (PR-6a)

Bridges PR-4's ColumnPageStream (raw compressed pages in storage order)
to arrow's standard ParquetRecordBatchReaderBuilder (decoded arrays).
PR-6's streaming merge engine drains each input row-group through this
to keep per-RG memory bounded — only one input RG worth of bytes is
materialised at a time, rather than the whole file.

Approach: reconstruct one row group's column-chunk byte layout in a
buffer (column chunks placed at their original offsets, gaps zero-
padded), wrap the buffer in `Bytes`, and feed it to
`ParquetRecordBatchReaderBuilder::new_with_metadata` with
`with_row_groups([rg_idx])`. Byte-exact reconstruction by carrying
each page's original Thrift-compact `header_bytes` through PR-4's
streaming reader — no re-encoding, so encoder-version drift inside
the compactor cannot silently corrupt outputs.

Adds `header_bytes: Bytes` to `Page` and captures the drained
header bytes inside `parse_page_header_streaming`. New
`StreamDecoder` borrows the stream and exposes `next_rg()` returning
one `RecordBatch` per input row group, idempotent at EOF.

Tests (9, all passing): single-RG and multi-RG drains, multi-page
columns, dict columns, null preservation, compression codec roundtrip
(uncompressed/snappy/zstd — LZ4 not enabled in our parquet feature
set), idempotent EOF, byte-exact reconstruction proof, and I/O failure
surfacing as PageDecodeError::PageStream rather than masked as decode.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style: nightly fmt fixup

CI nightly rustfmt (newer than my local at the time of the original
push) wraps `write_parquet(...)` onto multiple lines.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat: page-bounded Arrow decoder per data page (PR-6a.2)

Replaces PR-6a's per-RG fat-buffer approach. The previous implementation
reconstructed a whole row group's column-chunk bytes into a single
buffer and fed it to ParquetRecordBatchReaderBuilder — peak memory was
RG-size (tens to hundreds of MB per call). This rewrite is
page-bounded.

API change: \`StreamDecoder::next_rg() -> Option<RecordBatch>\` is
replaced by \`decode_next_page() -> Option<DecodedPage>\`. Each call
returns one input data page's worth of decoded rows as an
\`ArrayRef\`, plus \`(rg_idx, col_idx, page_idx_in_col, row_start)\`
indexing so PR-6b's merge engine can slice take indices per page.
Dictionary pages are absorbed silently (fed to the column reader for
subsequent data-page decoding); INDEX_PAGE is skipped.

Memory at any time:
- One in-flight page (compressed + decompressed bytes)
- One cached dictionary page per (rg, col) when dict-encoded
- One column reader per (rg, col) with small bookkeeping (level
  decoders, value decoder)

Does NOT buffer the row group, a column chunk, or a materialised
RecordBatch.

Implementation: wraps parquet-rs's public \`GenericColumnReader\` over
a per-(rg, col) PageQueue we feed one page at a time. Page → ColumnPage
conversion handles decompression (via \`compression::create_codec\`,
which required enabling parquet's \`experimental\` feature on our
Cargo.toml — the API has been stable across recent parquet-rs versions,
just not yet de-experimentalised), \`format::Encoding\` (Thrift wrapper)
→ \`basic::Encoding\` translation, and DataPageV2's
unencrypted-levels-then-compressed-values layout.

Array builders cover the production schema: Boolean, Int8/16/32/64,
UInt8/16/32/64, Float32/64, Utf8/LargeUtf8/Binary/LargeBinary, and
\`List<non-nullable primitive>\` (DDSketch \`keys\` / \`counts\`). Dict
columns decode to their value type (Utf8/Binary); the merge engine's
union schema normalises strings to Utf8 anyway, and the output writer
re-applies dict encoding based on observed cardinality.

Tests (9, all passing):
- single-RG and multi-RG round-trip (per-column comparison vs. canonical
  arrow reader)
- per-page indexing (\`row_start\`, \`page_idx_in_col\` monotonic
  per-(rg, col))
- idempotent EOF
- nullable column (\`service\` with nulls every 5th row)
- compression codecs (uncompressed, snappy, zstd)
- I/O failures surface as \`PageDecodeError::PageStream\`
- \`List<UInt64>\` (DDSketch \`counts\`) with variable list lengths
  including empty list and \`u64::MAX\`
- structural page-bounded contract: PageQueue depth ≤ 2 (one queued
  dictionary plus the current data page) across a long stream

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style: drop trailing blank line in page_decoder.rs

CI's `cargo +nightly fmt --check` flags a single trailing blank
line at end of file. No functional change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(page_decoder): preserve List<Float64/32> and LargeList<T> shapes

Addresses two Codex review comments on PR-6407.

## 1. List<Float64> (and List<Float32>) flattened to flat array

`build_float64_array` and `build_float32_array` ignored
`field.data_type()` and unconditionally constructed a flat
`Float64Array` / `Float32Array`, even when the input column was
declared as `List<Float64>` / `List<Float32>` (which the streaming
writer accepts and writes). The decoded page's Arrow type / row
shape didn't match the schema — downstream `RecordBatch`
construction or merge writing would fail with a schema mismatch, or
treat list elements as rows.

The int32/int64 builders already branched to `build_list_i32_array`
/ `build_list_i64_array` when the outer type was a list. The float
builders now follow the same pattern via new `build_list_f32_array`
and `build_list_f64_array` helpers. Call sites pass `reps` (was
discarded as `_reps`).

Coverage symmetry vs `streaming_writer.rs`'s list path:
- Int8/16/32 + UInt8/16/32 (Int32-physical) ✓ — already covered
- Int64 + UInt64 (Int64-physical) ✓ — already covered
- Float32 (Float-physical) ✓ — added
- Float64 (Double-physical) ✓ — added
- Bool, Utf8, Binary — the writer rejects these as list inners
  ("only flat numeric primitive inners are supported"); the decoder
  matches.

## 2. LargeList<T> outer constructed wrong array width

`build_list_i32_array` and `build_list_i64_array` accepted both
`DataType::List(_)` and `DataType::LargeList(_)` outer types but
always constructed `arrow::array::ListArray` (i32 offsets). For a
`LargeList<UInt64>` column emitted by the streaming writer, the
decoder produced `ListArray<UInt64>` — type mismatch.

Factored the outer-list construction into a new
`wrap_inner_in_list` helper that picks `ListArray` (i32 offsets) or
`LargeListArray` (i64 offsets) based on `field.data_type()`. All
four list builders (i32, i64, f32, f64) now route through it.

`list_offsets_from_levels` now returns `Vec<i64>` so the same
offset buffer can back either array width — the helper truncates to
i32 in the `List` path.

## Round-trip caveat (documented in test)

In our pipeline `init_column_state` derives fields from
`parquet_to_arrow_schema(_, None)`, and parquet's native schema
doesn't distinguish list offset widths — so the derived field is
always `List<>`, never `LargeList<>` (the ARROW:schema KV is
deliberately bypassed to avoid Dictionary type mismatches; see the
existing comment in `init_column_state`). Round-trip from a
`LargeList<>`-typed parquet input through the decoder produces
`List<>` — that's a pre-existing limitation, not introduced here.

The `LargeList` branch of `wrap_inner_in_list` is therefore only
reachable when callers construct fields directly. The new
`test_wrap_inner_in_list_dispatches_on_outer_flavour` exercises
that branch via direct calls; documenting the dispatch contract
without requiring a parquet round-trip that the rest of the
pipeline can't currently produce.

## Tests

- `test_list_float64_round_trip` — `List<Float64>` round-trips
  through the decoder as a `ListArray` with `Float64Array` inner,
  not a flat `Float64Array`.
- `test_wrap_inner_in_list_dispatches_on_outer_flavour` — direct
  builder call with `LargeList<UInt64>` field produces
  `LargeListArray` (i64 offsets); with `List<UInt64>` produces
  `ListArray` (i32 offsets).

11/11 page_decoder tests pass; 443/443 crate tests pass. Clippy
clean, fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(page_decoder): pre-fetch one page so peek_next_page sees real metadata

Addresses Codex review comment on PR-6407: "Preserve next-page
metadata for repeated pages".

## Problem

`PageQueueReader::peek_next_page` reads from the per-(rg, col)
queue. Before this fix, `decode_next_page` queued exactly ONE page
(the current data page) before calling `read_records`. parquet-rs's
column reader uses `peek_next_page` (via `at_record_boundary`) to
decide whether to flush partial repetition-level state when
decoding V1 data pages that contain repetition (i.e. `List<>`
columns). With only the current page queued, peek returned `None`,
which parquet-rs treats as "this is the last page" — it would
flush partial rep-level state at every page boundary and could
split a list record incorrectly when a single record's list spans
multiple V1 pages.

Husky writes V1 pages by default (parquet-rs `DEFAULT_WRITER_VERSION
= PARQUET_1_0`), and DDSketch-style `List<UInt64>` columns can
exceed page-size limits when sketches are large or page sizes are
small — making this a real correctness risk.

## Fix

`decode_next_page` now maintains a **one-page lookahead**:

1. After locating a state with a queued data page to decode, it
   pulls one more page from the underlying stream and routes it to
   its (rg, col) queue (which may be the same state's queue, in
   which case `peek_next_page` returns the real next-page metadata;
   or a different state's queue, in which case the current state's
   `peek_next_page` correctly returns `None` because the current
   column chunk is exhausted from this state's perspective).
2. THEN drives `read_records` on the current state. parquet-rs's
   column reader can now correctly check record boundaries.

The decode_next_page loop is restructured: it first looks for any
state with a queued unconsumed data page (which covers both the
"freshly pulled" case and the "lookahead from a previous call"
case), and only pulls from the stream when no state has work to
do. This naturally handles the case where a previous call's
lookahead landed in a different state's queue.

Memory: one extra in-flight page (the lookahead) — still
page-bounded; `test_page_bounded_queue_depth` continues to pass.

## Tests

- `test_list_record_spanning_pages_preserved` (new): writes a
  `List<UInt64>` column with `data_page_size_limit = 20` bytes,
  forcing a 50-element list record to span multiple V1 pages.
  Verifies all 50 values are preserved intact after decoding.
  Without the lookahead, this would fail (parquet-rs would split
  the record at page boundaries).
- `test_page_bounded_queue_depth` continues to pass (lookahead
  is bounded; queue depth stays small).
- All 11 prior page_decoder tests pass.

444/444 crate tests pass. Clippy clean, fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(page_decoder): only pre-fetch lookahead for repeated columns

Follow-up to the previous commit (9e74792) which added a one-page
lookahead in `decode_next_page` to give parquet-rs's column reader
accurate `peek_next_page` metadata for V1 record-spanning lists.

The unconditional pre-fetch broke callers that drop a `StreamDecoder`
mid-traversal and create a fresh one over the same
`ColumnPageStream` — the merge engine's phase 0 (sort col drain)
followed by a phase 3 fresh body-col decoder is exactly this pattern.
The pre-fetch advanced the stream past the current column chunk,
and the new decoder didn't have the pre-fetched page in its queue,
losing the page entirely.

Fix: only pre-fetch for columns where `max_rep_level > 0` (List<T> /
LargeList<T>). Flat columns get one value per record, so V1 record
continuation across pages doesn't apply — the lookahead's only
benefit is the list-record-spanning correctness this PR was
introducing. Sort cols (metric_name, timestamp_secs, sorted_series)
are all flat → no lookahead → stream advances only over consumed
pages → safe to drop the decoder and resume on a fresh one.

`test_list_record_spanning_pages_preserved` still passes (its column
has `max_rep_level = 1`, so the lookahead activates). All 12
page_decoder tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* refactor(page_decoder): make single-consumer invariant + parquet-rs lineage explicit

Addresses adamtobey's review on PR-6407.

- Document the single-consumer invariant on `StreamDecoder`: the
  `Arc<Mutex<_>>` on per-(rg, col) queues is for parquet-rs's
  `PageReader: Send` trait shape, not concurrency. All pushes and pops
  happen synchronously within a single `decode_next_page` call, so the
  lock/unlock/lock sequences cannot race.
- Collapse `find_state_with_queued_data_page` + `decode_state_head`'s
  num_values capture into `next_decodable_head`, returning
  `((rg, col), num_values)` from one lock pass. Removes the
  TOCTOU-looking lock-find-unlock-relock-refind shape.
- Add `build_primitive::<P>(...)` helper that mirrors parquet-rs's
  `PrimitiveArrayReader::consume_batch` + `coerce_i32`/`coerce_i64`
  coercion table. Cuts the int32/int64/float32/float64 builders from
  ~80 lines of repeated null-clone + cast + typed-constructor to one
  helper call per arm.
- Module comment cites
  `parquet::arrow::array_reader::PrimitiveArrayReader` and explains
  why we re-implement: that module is `#[doc(hidden)]` and gated by
  parquet-rs's `experimental` feature, which we don't enable.

No behaviour change. 444 lib tests pass; workspace clippy + nightly
fmt + rustdoc clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… (PR-6b.2) (#6409)

* feat: streaming column-major merge engine with page-bounded body cols (PR-6b.2)

Rebuilds PR-6b on top of PR-6a.2's per-page Arrow decoder. The
streaming merge engine now keeps body-col memory bounded by output
page size (not column-chunk size) while preserving caller-specified
M:N output splitting at sorted_series boundaries.

Architecture (Husky multi-input → multi-output sorted merge):

  Phase 0 (async) — drain sort cols from each input. With Husky
  column ordering, sort cols + sorted_series are the prefix of each
  row group's body bytes, so the decoder can stop after they are
  fully decoded; the remaining body col pages stay un-read in the
  input stream, ready for phase 3.

  Phase 1 — compute_merge_order over the per-input sort-col
  RecordBatches using the existing k-way (sorted_series,
  timestamp_secs) heap.

  Phase 2 — compute_output_boundaries with the caller's
  num_outputs, splitting at sorted_series transitions.

  Phase 3 (blocking + block_on bridges) — streaming write. All M
  output writers are alive for the duration. For each column in
  Husky order, every output's col K is written in turn:
   - Sort col / sorted_series: applied via arrow::interleave from
     the already-buffered phase-0 data.
   - Body col: each output page is assembled via arrow::interleave
     from input page slices, with decoders advanced page-by-page via
     handle.block_on from inside the sync iterator passed to
     write_next_column_arrays. Pages flush to the writer's sink as
     SerializedColumnWriter's page-size threshold trips — memory
     stays bounded by the in-flight output page plus a small number
     of in-flight input pages.

After all M outputs' col K is done, every input decoder is at the
start of col K+1 in its single row group. Move to col K+1.

PR-6b.2 only handles single-row-group inputs (real or PR-5-
adapter-presented). Multi-RG metric-aligned inputs are rejected
with a clear error message; supporting them requires either
consuming + discarding body cols of RG[i-1] from the stream to
reach RG[i]'s sort cols, or a second body GET — both are larger
scope changes that land in a follow-up.

Page-bounded contract verified by
test_body_col_streams_many_pages_per_column_chunk: with
data_page_row_count_limit=1000 on an 8000-row merge, the output
value column spans ≥ 2 pages, demonstrating that body col writes
respect data_page_size and do not materialise whole column chunks.

Tests (9, all passing): two-input merge, single-RG output for
single-metric_name input, total-rows-preserved across M:N,
sort-schema mismatch rejection, KV metadata propagation,
all-empty-inputs no-output, output drainable by StreamDecoder,
multi-RG input rejection, page-bounded body col streaming.

Also exposes existing helpers in merge/writer.rs as pub(super)
(apply_merge_permutation, build_merge_kv_metadata,
build_sorting_columns, resolve_sort_field_names, verify_sort_order)
so streaming.rs can reuse the same MC-3 / KV / sorting-columns
construction the non-streaming engine uses. PR-7 will fold the
non-streaming engine away.

PR-6c.2 will add file-size monitoring on top: close the current
output at the next sorted_series transition when an in-progress
file approaches the size cap, producing additional splits beyond
the caller's N.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: persist decoder + page cache across body-col passes

Address two Codex review findings on PR-6b.2 (#6409):

* P1 — Preserve decoder/page cache across output chunks. The merge
  engine was constructing a fresh `StreamDecoder` for every
  `advance_decoder_to_row` call, which reset the per-column
  `rows_decoded` counter so the second decoded page reported
  `row_start = 0` after the stream had already advanced. The page
  cache also lived on the per-output assembler, so pages whose row
  range straddled two outputs were dropped when the first output
  finished even though the stream couldn't be rewound. Both
  scenarios produced silently wrong rows or out-of-bounds panics on
  any input large enough to require multi-page advances per output
  or multi-output coverage of a single page.

  The decoder now lives on `InputDecoderState` (owned via the new
  `StreamDecoder::from_owned` constructor), and the per-input body-
  col page cache + cursor are reset only at the start of each body
  column.

* P2 — Stream body pages instead of collecting `Vec<ArrayRef>`. The
  per-output body-col write now feeds `write_next_column_arrays`
  one page at a time via `StreamingBodyColIter`, which captures
  assembly errors in a side cell so memory stays bounded by output-
  page size rather than column-chunk size.

Two regression tests cover the bug shapes — multi-page body col
within one output (2500 rows × 50-row pages) and multi-output input
where pages span output boundaries.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: guard body-col path against zero-row-group inputs

Address Codex P1 (third comment) on PR-6b.2 (#6409): phase 0
explicitly accepts inputs with `num_row_groups() == 0` (returning a
zero-row sort batch), but `write_body_col_for_all_outputs`
unconditionally called `state.metadata.row_group(0)` for every
input, panicking with "index out of bounds" before the first body
column was written.

Treat zero-RG inputs the same as inputs whose schema lacks the
current column: push `None` into `input_col_indices` and skip them
for this body col. Also drop the unused `input_target_rows` vec
that was being built only for its row-group lookup side effect.

Regression test `test_zero_row_input_mixed_with_non_empty` builds a
0-row + 50-row pair and merges them through the streaming engine;
without this fix the merge blocking task panics inside parquet-rs's
`row_group()` indexing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: drop all-null sort fields from per-output streaming schema

Address Codex P2 (fourth comment) on PR-6b.2 (#6409): the schema
derivation condition `sort_optimised.has(name) || full_union.has(name)`
was tautologically true for every iterated field — every `field` came
from `full_union_schema`, so the second disjunct was always satisfied
and the intended "drop all-null sort fields" branch never fired.

Pass the sort union schema in explicitly so we can tell sort fields
apart from body fields. Sort field present in `sort_union_schema` →
keep only if `optimize_output_batch` kept it (not all-null for this
output's rows). Body field → keep unconditionally; tracking per-output
body-col presence would require pre-reading every body column for
every output, which is the column-chunk-bounded buffering the
streaming path exists to avoid.

Regression test `test_derive_output_schema_drops_all_null_sort_field`
calls the helper directly with a synthetic union + sort-optimised
pair and asserts an all-null sort field is dropped while a body
field with the same union-schema position is preserved. Verified
the test fails against the pre-fix logic with the expected
`['metric_name', 'env', 'timestamp_secs', 'value']` vs
`['metric_name', 'timestamp_secs', 'value']` mismatch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* chore: code-quality fixes + MC-2 type round-trip test on streaming merge

Bundle three pieces:

- **Husky → neutral phrasing.** Replaced the seven "Husky" mentions in
  the streaming engine's doc-comments and error messages with neutral
  "sort-cols-first storage ordering" / "column ordering" phrasing.
  Project is Quickwit; the internal column-ordering scheme didn't
  need a separate brand in user-visible error strings.

- **One `.unwrap()` → `.expect()` in lib code.** The hashmap lookup
  in `drain_sort_cols_one_input` is guarded by a `contains_key`
  check; promote the implicit invariant to a documented panic
  message per CODE_STYLE.md.

- **`align_inputs_to_union_schema` nullability fix.** The first-sight
  branch unconditionally marked new fields nullable; the existing
  comment claims "columns that don't appear in every input must be
  nullable" but the code applied that rule to every field. Replaced
  with a two-pass scheme: track `any_nullable` and `appears_in` per
  field across all inputs, then mark nullable iff some input had it
  nullable OR the field is missing from some input. This unblocks
  `List<Float64>` columns end-to-end (the writer rejects nullable
  List; the previous behaviour forced every list column nullable on
  first sight even when every input declared it non-null).

- **MC-2 round-trip integration test.** New
  `test_mc2_all_types_round_trip_through_streaming_merge` builds two
  inputs covering every parquet physical type the decoder accepts —
  Int8/16/32/64, UInt8/16/32/64, Float32/64, Bool, Utf8,
  Dictionary<Int32, Utf8>, LargeBinary, and non-nullable
  `List<Float64>` — merges them through the streaming engine, and
  asserts every `(sorted_series_key → body-col tuple)` pair survives
  byte-equal. Storage-encoding transitions (Dict→Utf8, LargeBinary→
  Binary) are normalised in the render helper because MC-2 promises
  value preservation, not internal representation preservation.

  This test caught two real bugs while being written:
  1. Body cols must be declared in lexicographic order — the streaming
     engine assumes the storage convention and crashes mid-merge if
     they aren't. Fixture re-ordered accordingly. (Worth adding
     upfront validation in a follow-up; not in scope here.)
  2. The schema-union nullability bug above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(MS-7): page-cache bounded-memory contract is observable + asserted

Add a `#[cfg(test)] static AtomicUsize` PEAK_BODY_COL_PAGE_CACHE_LEN
that records the maximum length any input's `body_col_page_cache`
ever reached during the current merge, bumped on every page push in
`advance_decoder_to_row`. Zero production overhead — the `record_*`
helper compiles to a no-op outside test builds.

New test `test_ms7_body_col_page_cache_bounded_regardless_of_input_size`
runs the streaming merge over three input sizes (300 / 3 000 / 30 000
rows at 50-row pages) and asserts:
  1. Peak resident pages stays below a fixed ceiling (24, for the
     ratio of OUTPUT_PAGE_ROWS=1024 to input page_rows=50, plus a
     few-page slack for decoder lookahead + transients).
  2. Growth from 3 000 to 30 000 rows (10× more input pages) yields
     at most a 2-page increase in peak. The whole MS-7 claim is that
     peak does not scale with input size.

Verified the test catches a deliberate regression: removing the per-
output-page eviction loop in `assemble_one_output_page` pushed the
3 000-row peak to 60 (60 > 24) and the test failed with the expected
message.

Fixture support: `write_input_parquet_with_small_pages` now also
sets `write_batch_size` and `data_page_size` proportional to the
requested page row count. Without those, the arrow writer's defaults
(64 KiB / 1 MiB) caused `data_page_row_count_limit` to be silently
ignored and produced one giant page per column. Probed the output
via `get_column_page_reader` — 30 000 rows now produces 600 pages
per col as expected.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: drive col loop from full union schema + collect service tags from sort col

Address two new Codex P2 findings on PR-6b.2 (#6409):

- **Use the full union schema when driving column writes.** The old
  `build_parent_union_schema` picked one per-output schema by field
  count and used it as the column-iteration driver. If two outputs
  drop *different* all-null sort fields and end up with the same
  field count, the first wins — and any column it dropped is never
  iterated, leaving the other output's writer missing a column or
  writing subsequent columns into the wrong slot. The doc comment
  already said "process the FULL union schema's cols in order"; the
  implementation diverged. Drive `write_all_columns` from
  `full_union_schema` directly and delete the broken heuristic.

- **Collect service names from the sort-col path too.** If the sort
  schema places `service` in the sort key
  (`metric_name|service|...`), the streaming engine writes it via
  the sort-col path and the body-col `track_service` branch never
  runs. `MergeOutputFile.low_cardinality_tags[TAG_SERVICE]` came
  back empty even though every row had a service value. Extract
  service names from `static_meta.sort_optimised` at
  `finalize_output_writer` time so the TAG_SERVICE metadata is
  accurate regardless of which write path the column took.

Two regression tests:
- `test_heterogeneous_dropped_fields_drive_from_full_union_schema`
  builds two inputs whose per-output schemas drop different all-null
  sort fields with the same field count. Each kept tag must survive
  to its output. Verified the test fails (panic on missing column)
  against the pre-fix logic.
- `test_service_as_sort_column_still_populates_low_cardinality_tags`
  uses a sort schema `metric_name|service|-timestamp_secs/V2` and
  asserts the output's `low_cardinality_tags[TAG_SERVICE]` covers
  every distinct service value. Verified the test fails against
  pre-fix `finalize_output_writer` with the expected "must contain
  TAG_SERVICE" message.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* refactor(streaming): rename to fill_page_cache_to_row + cross-input docstrings

Addresses adamtobey's review on PR-6409.

- Rename `advance_decoder_to_row` → `fill_page_cache_to_row`. The
  function's effect on the world is "add pages to the per-input cache"
  — it never advances a cursor or skips data. The old name primed
  reviewers to ask "are we skipping rows?" (which is exactly what
  Adam asked).
- Use a `rows_for_current_output` register inside
  `compute_input_row_destinations` and write to
  `rows_per_output[out_idx]` once after the inner loop; saves the
  per-row indexed store.
- Expand `body_col_page_cache` docstring with the horizontal-vs-vertical
  memory bound argument and a pointer to the MS-7 invariant test
  (`test_ms7_body_col_page_cache_bounded_regardless_of_input_size`).
- Add context comments at the cross-file invariant sites Adam flagged:
  - Sort-cols-first storage-ordering contract on the sort-col drain.
  - Single-RG-input restriction with forward pointer to PR-6c.2
    (#6424) which relaxes it.
  - `rg_partition_prefix_len` defaulting to 0 (with reference to the
    legacy-promotion `mixed_prefix_ok` escape in PR-6423).

No behaviour change. 461 lib tests pass; workspace clippy + nightly
fmt + rustdoc clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(streaming): assert per-input body cols are in Husky order

Adam's question on L194 asked whether body-col ordering was a hard
cross-file requirement. My first answer said "no" — true for which
array we read (we look up by name), but wrong for the body-col
memory bound:

  Phase 3 iterates the union schema's body cols alphabetically and
  asks each input's decoder to advance to that col. Parquet emits
  column chunks in declared schema order, so the decoder reads pages
  in that input's storage order. If an input's body cols aren't in
  the same alphabetical-after-sort-cols order ("Husky order"),
  fill_page_cache_to_row has to drain every body col preceding the
  requested one on the wire — those pages land in
  body_col_page_caches[col_idx] until that col's turn in the union
  iteration. The cache grows to a full column-chunk's worth per
  misaligned col. Vertical, not horizontal. Defeats streaming.

Catch this at merge entry instead of silently degrading to vertical
caching:

- `assert_inputs_in_husky_body_col_order` runs after
  `build_input_decoders_state` and before phase 0. Bails with a
  concrete error message naming the offending pair of column names.
- New regression test
  `test_assert_inputs_in_husky_body_col_order_rejects_misaligned_input`
  builds an input with body cols `[value, metric_type]` (alphabetical
  would be `[metric_type, value]`) and asserts the merge errors out
  before phase 3.

No production producer violates this today (streaming writer and
legacy Husky writer both emit lexicographic body cols), so the
assertion catches future producer drift, not current traffic.

462 lib tests pass (461 prior + 1 new); workspace clippy + nightly
fmt + rustdoc clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot requested review from a team as code owners May 20, 2026 13:48
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 81983ce692

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +532 to +533
let stream: Box<dyn ColumnPageStream> = match op.target_prefix_len_override {
Some(target) if split.rg_partition_prefix_len < target => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Route legacy prefix-0 inputs through the adapter

When the streaming engine is enabled for regular merges, existing legacy splits with rg_partition_prefix_len == 0 and multiple row groups take this direct-reader branch because target_prefix_len_override is None. streaming_merge_sorted_parquet_files then rejects those inputs as legacy multi-RG inputs ... must go through the PR-5 adapter, so compaction stalls for old/legacy splits that the in-memory fallback can currently merge. Regular streaming merges need to detect this shape and open LegacyInputAdapter with target 0 instead of only adapting promotion inputs.

Useful? React with 👍 / 👎.

g-talbot and others added 6 commits May 20, 2026 10:10
… splitting (#6424)

* feat: per-merge-region streaming engine — multi-RG inputs + outputs (PR-6c.2)

Restructures PR-6b.2's flat phase 0 → phase 3 into a per-merge-region
loop. Unlocks multi-RG metric-aligned input support and produces
multi-RG output naturally — one output row group per merge region
(typically one per metric_name when `rg_partition_prefix_len == 1`).

Sort-prefix alignment (`prefix_len >= 1`) guarantees that any merge
region has AT MOST one row group per input. That single invariant
unlocks the restructure:

1. Pre-compute regions from RG metadata. For `prefix_len >= 1`, read
   each RG's metric_name min stat (must equal max — verifies
   metric-alignment). Group RGs across inputs by prefix_key. Sort
   regions by prefix_key. For `prefix_len == 0` (single-RG inputs
   only, validated earlier), one region covers everything.

2. Assign regions to output files by cumulative row count. Caller's
   `num_outputs` preserved as the upper bound. Each output file gets
   a contiguous slice of the region list, so output files have
   non-overlapping key ranges.

3. Per-region processing: for each region, advance contributing
   inputs' decoders through their RGs (drain sort cols of that RG,
   then stream body cols via the existing page-bounded
   BodyColOutputPageAssembler). Each region becomes one output RG in
   the current writer; when the assignment moves to a new output
   file, close the previous writer and open a new one.

The streaming body-col mechanism from PR-6b.2 (arrow::compute::
interleave + handle.block_on driven decoder) is unchanged; it just
runs over smaller row ranges (one region instead of one whole
output).

PR-6b.2's check that rejected any multi-RG input is replaced with:
reject only `prefix_len == 0` AND multi-RG (those still need PR-5's
LegacyMultiRGAdapter). Multi-RG metric-aligned inputs are now
accepted natively.

PR-6b.2 optimised the per-output schema based on per-output sort col
data (drop all-null cols, re-dict-encode low-cardinality strings).
With per-region streaming we don't know each region's content until
we drain it, so PR-6c.2 declares the writer's schema as the full
union schema and leaves output strings as Utf8. Per-output dict
re-encoding can be reintroduced later by tracking cardinality during
the streaming pass.

- All 9 PR-6b.2 tests still pass (single-RG input regression —
  behaviour preserved).
- New test_multi_rg_metric_aligned_input_produces_multi_rg_output:
  feeds a 2-RG metric-aligned input (prefix_len = 1, RG 0 =
  cpu.usage, RG 1 = memory.used); the streaming engine accepts it
  and produces a 2-RG output (one RG per metric_name region).
- Renamed test_multi_rg_input_rejected →
  test_legacy_multi_rg_input_rejected to reflect the new rejection
  scope (only prefix_len == 0 multi-RG is rejected; metric-aligned
  is accepted).

10/10 streaming tests pass. Clippy, doc, machete, fmt all clean.

1. File-size cap with sort-key-boundary splits.
2. Per-output schema optimisation (track region body-col cardinality
   during the streaming pass).
3. Mid-region splits at sorted_series transitions for finer-grained
   M:N control when callers want more outputs than regions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(streaming): reject duplicate-prefix RGs + use escape encoding

Two P1 bugs flagged by Codex on PR-6c.2 (#6410):

1. **Duplicate input row groups silently dropped.** When one input
   contained two RGs with the same composite prefix key,
   `process_region` overwrote `sort_col_batches[input_idx]` while
   `Region::total_rows` still counted both — losing rows and
   misaligning the body-col / sort-col mapping. Now enforce
   at-most-one-RG-per-input-per-prefix as a strong invariant at three
   sites: the merge read path (`extract_regions_from_metadata`), the
   streaming merge output finalize, and the indexing writer
   (`ParquetWriter::write_to_bytes` / `write_to_file_with_metadata`).
   The new `assert_unique_rg_prefix_keys` helper is shared.

2. **Byte-array prefix encoding broke lex order across lengths.**
   The 4-byte length prefix made `"b"` sort before `"aa"`, violating
   the declared ASC order. Switched to byte-stuffed escape encoding
   (`0x00` → `0x00 0x01`, terminator `0x00 0x00`), which preserves
   single-column lex order AND retains unambiguous concatenation for
   composite keys (the terminator is the smallest 2-byte sequence
   under escaping, so shorter values still sort before longer ones
   with the same prefix).

Tests:
- `test_byte_array_prefix_preserves_lex_order_across_lengths` —
  `"aa" < "b"`, empty < non-empty, shared-prefix shorter < longer,
  null-byte escaping preserves order.
- `test_streaming_merge_rejects_duplicate_prefix_rgs_in_one_input` —
  end-to-end bail with clear error.
- `test_write_to_bytes_rejects_duplicate_rg_prefix_when_claimed_aligned`
  + the `write_to_file` and single-RG positive counterparts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(streaming): split regions at sorted_series for prefix_len=0 multi-output

When inputs declare rg_partition_prefix_len = 0 (legacy single-RG)
and the caller asks for num_outputs > 1, the engine subdivides the
single region at sorted_series transitions in the merge order so it
can honor the output count. A single sorted_series run is never
broken; if one run exceeds the remaining budget the whole run lands
in one output anyway. The output inherits the input's
rg_partition_prefix_len (=0) — the engine does not synthesize a
prefix it can't unconditionally guarantee.

Also handles the giant-single-metric case (prefix_len=0, one
metric_name, num_outputs > 1): sorted_series transitions still
split the merge order even though there are no metric_name
transitions to drive a prefix synthesis.

Implementation:
- New `split_region_at_sorted_series` in region_grouping: walks the merge order and splits at
  sorted_series transitions when accumulated rows reach the target budget.
- Main engine loop: when num_outputs > current_output_idx + 1 AND region's rows exceed the
  remaining budget, drain sort cols for the region, compute merge order, call
  split_region_at_sorted_series, process sub-regions.
- Per-col page cache + cursor keyed by col_idx so the body-col path can read pages once and re-use
  them across sub-regions within the same top-level region. Resets between top-level regions
  (different RGs).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(streaming): correct 'crash' → 'bail' in MS-2 doc comments

The MS-2 validation path returns `Err` via `bail!()` (anyhow), not a
panic / abort. Five doc-comment / inline-comment sites described the
failure as "the engine would crash mid-merge" — overstated. Callers
get a `Result::Err` propagated up the spawn_blocking task and the
`streaming_merge_sorted_parquet_files` return.

Sites updated:
- `region_grouping.rs` module doc.
- `validate_region_order_matches_physical_rg_order` doc.
- streaming.rs MS-2 validation call-site comment.
- Test docstrings for `test_streaming_merge_with_desc_prefix_col` and
  `test_ms2_region_order_disagrees_with_physical_rg_order_rejected`.

No behaviour change. 477 lib tests pass; clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(streaming): fix wrong adapter type name + explain rejection intent

Two sites referenced a non-existent `LegacyMultiRGAdapter` — the
actual type, introduced in PR-5 (#6408), is `LegacyInputAdapter`
in `storage::legacy_adapter`. Fixed both references.

Also expanded the rejection-block comment to make the *intent* of
the guard explicit: it catches caller bugs (wiring a raw legacy
multi-RG `StreamingParquetReader` straight into the streaming
merge), not a degraded-input fallback. Production code routes
legacy splits through `merge::execute_merge_operation` which
wraps them in `LegacyInputAdapter` first.

No behaviour change. Targeted test passes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(body_assembler): tighten output-iter termination + assert invariant

adamtobey nit on PR #6424: `rows_emitted >= expected_rows` accepts
`emitted > expected` as a normal termination condition, which would
actually be a real accounting bug. The math rules `>` out by
construction — `page_size = remaining.min(OUTPUT_PAGE_ROWS)` where
`remaining = expected_rows - rows_emitted`, so each
`rows_emitted += page_size` keeps `rows_emitted ≤ expected_rows`.

Two changes:
- Termination becomes `rows_emitted == expected_rows` so we don't
  silently accept an overshoot.
- `debug_assert!(rows_emitted <= expected_rows, …)` at the top of
  `next()` documents the invariant and surfaces a regression loudly
  (panic in debug + tests) instead of silently terminating one
  iteration too late.

No behaviour change in the happy path; bugs that would have produced
`>` now fail tests instead of producing wrong output.

477 lib tests pass; clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(streaming): recompute split budget across the output-rollover boundary

Codex P1 finding on PR #6424: when a top-level region exactly fills
the current output (so `remaining_in_current == 0`) and the next
prefix-aligned region needs splitting, the split's first-sub-region
budget was the stale zero remainder of the about-to-be-finalized
output. `split_region_at_sorted_series` therefore cut after the
first sorted_series run, producing a tiny leftover plus a large
continuation that both inherited the parent region's prefix key.
The sub-region loop then rolled over to a fresh output and wrote
both pieces there, tripping the PA-3 duplicate-prefix-RG check in
`finalize_output`.

Fix: detect the rollover at decision time and compute
`effective_first_target` / `effective_outputs_remaining` against the
*next* output's empty budget. With the fix, the example above just
chooses `needs_split = false` (region fits the fresh output's full
target), processes the region whole, and rolls over cleanly.

Regression test `test_region_exactly_fills_output_does_not_split_next_aligned_region`
exercises the exact scenario Codex described: three 50-row RGs with
distinct (metric, service) prefixes, `num_outputs = 3`, target = 50.
Pre-fix, the merge bailed with PA-3 on output 1; post-fix, three
clean outputs each with one unique prefix key. Verified by reading
each output's parquet metadata back through
`assert_unique_rg_prefix_keys`.

478 lib tests pass (477 prior + 1 new); clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(streaming): reject null-mixed + all-null prefix RGs

Codex P1 on PR #6424: `extract_aligned_prefix_value` decided
prefix alignment purely from `min` / `max` statistics. Parquet
records those over non-null cells only, with `null_count` reported
separately, so two real failure modes slipped through:

1. **Mixed null + non-null.** A row group with `N` nulls plus a
   single non-null cell `"x"` reports `min == max == "x"` and the
   `min == max` check silently accepted it — but two distinct
   prefix keys (null and `"x"`) lived in that RG, breaking the
   at-most-one-prefix-value-per-RG invariant (PA-1).
2. **All-null RG.** Parquet records no `min` / `max` for an all-
   null chunk, so the legacy check bailed with the misleading "no
   min in stats" error. Logically the RG carries one prefix value
   (null) and is aligned — but supporting it cleanly requires a
   null marker in the composite-key encoding that agrees with
   SS-2's "nulls last" rule. `encode_byte_array_prefix(&[])` puts
   nulls *first*; coordinating that with SS-2 is a follow-up.

Fix: read `null_count_opt()` from stats and `num_values()` from
the column-chunk metadata. Bail explicitly in both cases — mixed
with a PA-1 message naming the (nulls, non-null) split, all-null
with a clear "not yet supported" pointer.

Two regression tests in `streaming.rs::tests`:
- `test_mixed_null_and_value_prefix_rg_rejected`: 1 RG, 3 cells
  `"cpu.usage"` + 1 null. Asserts PA-1 bail.
- `test_all_null_prefix_rg_rejected`: 1 RG, 3 nulls. Asserts the
  "all-null … not yet supported" bail.

480 lib tests pass (+2 new); workspace clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* refactor(streaming): share storekey prefix encoding with sorted_series

The per-RG composite prefix key now uses the same storekey-based
encoding as `sorted_series` — same `(ordinal, value)` layout, same
direction-inversion, same null-skip pattern — so a per-RG prefix
key is a literal byte prefix of every `sorted_series` value emitted
by rows in that RG.

Why: the prior byte-stuffed escape encoding had no in-line way to
represent an all-null prefix RG (an empty marker would lex-sort
before any present-value key, conflicting with SS-2 nulls-last).
With the shared encoding, an all-null column is skipped entirely
and the next column's higher ordinal byte appears in its place,
giving nulls-last ordering for free — the same trick already proven
in `sorted_series::encode_row_key`.

Per-column logic now goes through one helper:

  `crate::sorted_series::append_prefix_col_to_key(buf, ord, val, desc)`

shared between `sorted_series` (per-row keys) and
`merge::streaming::region_grouping` (per-RG keys). It writes
`storekey(ord) || storekey(val)` and inverts only the value bytes
for DESC columns. `sorted_series::encode_row_key` was refactored
to call the helper; the open-coded inline encoding is gone.

Trailing **prefix-length sentinel**: each per-RG key ends with a
`u8(prefix_len)` ordinal byte. This handles the prefix_len=1
edge case where an all-null RG's empty body would otherwise lex-sort
*before* any non-null RG — with the sentinel, the all-null key
becomes `[prefix_len]` and non-null keys still start with `ord(0)`
(< prefix_len), so non-null sorts first. The sentinel is also what
`sorted_series` writes immediately after the prefix cols, so the
literal-prefix property is preserved.

Null handling in `extract_rg_composite_prefix_key`:
- **All-null RG**: column skipped, RG groups into its own region (after non-null regions).
- **Mixed null + non-null**: rejected as a PA-1 violation (rows in the same RG would encode to
  two distinct prefix keys; producer is supposed to start a new RG at the null/non-null
  transition).
- **Otherwise**: standard `min == max` check, then the type-dispatched storekey encoding via the
  helper.

Removed:
- `extract_aligned_prefix_value` (replaced by `encode_prefix_col_value` which calls the helper).
- `encode_byte_array_prefix` (byte-stuffed escape, no longer used).
- `invert_for_descending` (the helper handles inversion per-column).
- `test_invert_for_descending_reverses_lex_order` and
  `test_byte_array_prefix_preserves_lex_order_across_lengths` (byte-level tests of the removed
  encoding; semantic properties remain enforced by `storekey`'s own tests plus the higher-level
  prefix tests).

Replaced `test_all_null_prefix_rg_rejected` with
`test_all_null_prefix_rg_groups_into_separate_region_sorted_last`:
builds two inputs (one with `metric_name = "cpu.usage"`, one with
`metric_name = NULL`) and verifies the merged output has two RGs
with the all-null region in RG 1 (sorted after the non-null
region) — pinning the nulls-last ordering that the sentinel
encoding produces.

Updated `test_extract_rg_composite_prefix_key_two_byte_array_cols`
for the new byte layout (`storekey(ord) || storekey(val)` per col
plus the trailing sentinel byte).

`PrefixColumn` gains an `ordinal: u8` field, populated from each
column's position in `qh.sort_fields` so it matches the ordinal
`sorted_series` would assign.

478 lib tests pass; workspace clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style(streaming): re-fmt to latest nightly rustfmt

CI's nightly rustfmt (1.9.0-nightly 2026-05-17) wrapped a handful
of comment / bail!-message / where-clause / vec! literal lines
slightly differently than my local nightly at commit time
(1.9.0-nightly 2026-05-11). Re-formatting all three affected files
catches the drift in this commit so CI Lints stops complaining;
local nightly is now updated to match CI.

No behaviour change. 478 lib tests still pass on the slice.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(merge): preserve metastore rg_partition_prefix_len from writer's KV stamp

The streaming merge engine produces sort-prefix-aligned multi-RG output
and stamps `qh.rg_partition_prefix_len = input_meta.rg_partition_prefix_len`
in the file's KV (verified by `assert_unique_rg_prefix_keys` before close).
`merge_parquet_split_metadata` then ran after and unconditionally demoted
to 0 whenever `output.num_row_groups > 1` — breaking CS-1 (metastore must
mirror on-disk KV) for every multi-RG streaming-engine output. Aligned
splits got tagged 0 in the metastore on every merge and leaked out of
the prefix-aligned compaction bucket on the next pass.

Carry the value the writer actually stamped via a new
`MergeOutputFile.output_rg_partition_prefix_len` field, then propagate
it as-is in metadata aggregation. Both engines populate the field:
- Legacy `merge/writer.rs` reports its demoted value (row-count-driven
  RG boundaries can't honor prefix alignment, so it stamps 0 on multi-RG).
- Streaming `merge/streaming/output.rs` reports the inputs' prefix
  unchanged (it splits at prefix transitions and the writer verifies).

CS-1 holds by construction — same source of truth, no re-derivation.

Tests:
- `test_output_prefix_len_demoted_when_multi_rg` → renamed to
  `test_output_prefix_len_carries_writers_value_when_demoted`; now
  asserts that the metastore mirrors the writer's reported value.
- New `test_output_prefix_len_preserved_on_multi_rg_streaming_engine`
  asserts that a multi-RG streaming output (writer reports prefix_len=2)
  keeps the prefix in the metastore — the regression case for F1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…_prefix_len (#6425)

* feat(legacy-adapter): synthesize prefix-aligned row groups

The legacy adapter previously consolidated multi-RG legacy inputs
into a single oversized row group and left `rg_partition_prefix_len`
at the original's (typically `0`). The streaming merge engine then
sent these single-RG/prefix=0 inputs through the new sub-region
splitting path — correct, but it forfeits the prefix-aware fast path
for outputs derived from legacy inputs and gives up the row-group
pruning that prefix alignment enables.

After consolidating, the adapter now slices the resulting record
batch at first-sort-col transitions (typically `metric_name`) and
emits one parquet row group per slice, stamping the re-encoded file
with `qh.rg_partition_prefix_len = 1`. The merge engine then reads
it through the prefix-aware fast path: one region per metric_name,
the existing duplicate-prefix invariant on read validates uniqueness.

Fallback: if the original file has no `qh.sort_fields` KV, the
sort-fields string fails to parse, the first column can't be
resolved in the arrow schema, or the consolidated batch is empty,
the adapter reverts to a single-RG re-encode without claiming any
prefix alignment. That input still works — the engine's
prefix_len=0 sub-region splitting path picks it up. This keeps the
adapter robust for files written by very early versions of the
indexer that may pre-date the standard KV layout.

Implementation: `reencode_prefix_aligned` replaces
`reencode_as_single_row_group` and either dispatches to the new
multi-RG writer or to the legacy single-RG writer based on whether
the first sort col is resolvable. `RowConverter` handles the
prefix-value equality check uniformly across dictionary, utf8, and
primitive types. The KV injection helper replaces (rather than
appends) any existing `qh.rg_partition_prefix_len` so re-runs and
files mistakenly carrying a stale value still land at the freshly
synthesized prefix.

Tests:
- `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`
  — 3 metrics × 40 rows, multi-RG input → 3 prefix-aligned output
  RGs and `qh.rg_partition_prefix_len = 1` KV.
- `test_legacy_input_single_metric_yields_one_rg_with_prefix_kv` —
  one metric → one RG, prefix KV still stamped (vacuously aligned).
- `test_legacy_input_without_sort_fields_falls_back_to_single_rg` —
  fallback path preserved when sort-fields KV is missing.
- All existing tests pass unchanged (they use empty KVs or
  unparseable sort-fields strings, both of which exercise the
  fallback path).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(legacy-adapter): parameterize on target_prefix_len with composite-prefix support

`LegacyInputAdapter::try_open` now takes `target_prefix_len: u32`
chosen by the caller, matching the merge plan's consensus prefix
length. The adapter slices the consolidated batch at every transition
of the first N sort columns (composite key, via `RowConverter` over
all N fields) and emits one output row group per slice, stamping the
output with `qh.rg_partition_prefix_len = target_prefix_len`. With
`target_prefix_len = 0` the adapter takes the original single-RG
passthrough path with no prefix-alignment claim.

A sort column that is named in `qh.sort_fields` but missing from the
file's arrow schema is treated as implicitly null at every row per
SS-3. A constantly-null column trivially satisfies alignment on that
column (null == null) and contributes no transitions, so the split
boundaries are driven by the columns that are present. This matches
the merge engine's compaction-time treatment of missing columns and
keeps a legacy file with an evolved schema usable as a prefix-aligned
input.

`PrefixUnresolvable` now fires only on cases where the file doesn't
advertise enough sort *names* to honor the request:
- `qh.sort_fields` absent or unparseable
- `qh.sort_fields` declares fewer sort columns than `target_prefix_len`

A column missing from the arrow schema no longer counts as
unresolvable; the adapter materialises a `NullArray` of the batch's
length in that slot and proceeds.

Tests:
- `test_target_prefix_len_zero_passes_through_as_single_rg` — explicit
  N=0 fallback, no prefix KV stamped.
- `test_target_prefix_len_two_splits_by_metric_and_service` — composite
  prefix (`metric_name`, `service`) → 4 RGs, KV declares prefix_len=2.
- `test_target_prefix_len_one_without_sort_fields_returns_unresolvable`
  — no `qh.sort_fields` KV → `PrefixUnresolvable`.
- `test_target_prefix_len_exceeds_declared_sort_cols_returns_unresolvable`
  — sort schema declares 2 cols, caller asks 3 → `PrefixUnresolvable`.
- `test_missing_prefix_col_treated_as_null_satisfies_alignment` —
  sort schema declares `metric_name|env|-timestamp_secs` but `env`
  is absent from the arrow schema → no error, only metric_name
  transitions split RGs, KV still stamps prefix_len=2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(legacy_adapter): note where reader-side SS-3 handling lands

Codex P2 on PR #6425: the adapter records `None` for missing prefix
columns and stamps `rg_partition_prefix_len = target_prefix_len`
anyway. In isolation that produces a file with an advertised prefix
the current reader (`find_prefix_parquet_col_indices` on the #6425
state) bails on.

The reader-side fix — returning `Vec<Option<PrefixColumn>>` and
synthesizing a constant `[0x00, 0x00]` byte for `None` slots —
lands in PR #6426 (the hardening slice, F12 from the adversarial
review). The only caller of this adapter is
`execute_merge_operation`, introduced in PR #6423 which sits above
#6426 in the stack, so no production caller can produce a missing-
column prefix until the reader fix is in place.

Adding the in-code pointer so a future reader bisecting the stack
doesn't have to trace the relationship from scratch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(merge): consumer honors SS-3 (move F12 forward from #6426 to #6425)

Previously the F12 fix — "consumer side honors SS-3 missing prefix
columns" — lived in the hardening PR (#6426). At the #6425
isolation level, the legacy adapter records `None` for a prefix
column absent from the parquet schema and stamps
`rg_partition_prefix_len = target_prefix_len` on the output, but
the reader's `find_prefix_parquet_col_indices` bails on any missing
column. So #6425 + #6424 alone would produce a legacy-adapter file
that the streaming-merge reader rejects mid-merge — i.e. a known-
incoherent intermediate stack state.

Move F12 into this PR so the adapter and reader agree at the same
slice:

- `find_prefix_parquet_col_indices` now returns
  `Result<Vec<Option<PrefixColumn>>>`. `Some(_)` when the column
  is present in the parquet schema; `None` per SS-3 when the
  column is named in `qh.sort_fields` but absent from the schema.
- `extract_rg_composite_prefix_key` skips `None` slots entirely
  (no ordinal byte, no value bytes for that column). The trailing
  `u8(prefix_len)` sentinel introduced in the storekey refactor
  keeps the resulting key well-formed across present/absent
  columns.
- Callers that index into `prefix_cols` updated to use
  `.as_ref().expect(…)` where they assume presence.

Existing SS-3 test
`test_missing_prefix_col_treated_as_null_satisfies_alignment` in
`legacy_adapter.rs` gets an `assert_unique_rg_prefix_keys` call
verifying the adapter's output is consumable by the reader — pins
the "stack-coherent at #6425" property the F12 hop establishes.

Also incidental nightly-fmt cleanups in `sorted_series::append_prefix_col_to_key`
and the two-input fixture in `test_all_null_prefix_rg_groups_into_separate_region_sorted_last`.

The hardening PR (#6426) will be re-cascaded to drop the now-
duplicated F12 hunks (keeping its F8 adapter-rejects-unsorted +
F2 verifier-strength changes intact).

485 lib tests pass on this slice; workspace clippy + nightly fmt
clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(legacy-adapter): strip stale rg_partition_prefix_len when target=0

Codex P2 on PR #6425: when the legacy adapter is called with
`target_prefix_len == 0` it consolidates the input into a single
RG, but the previous version preserved the input's footer KVs
unchanged. If the input itself already carried a stale nonzero
`qh.rg_partition_prefix_len` claim (e.g., a prefix-aware split
being re-encoded through the legacy fallback path), the
single-RG output would still advertise that claim. Downstream
metadata extraction would take the prefix-aware path against an
RG carrying multiple first-prefix values — failing the PA-1
min/max alignment check on read despite the caller explicitly
asking for the legacy path.

Strip `PARQUET_META_RG_PARTITION_PREFIX_LEN` from `original_kv`
in the `target_prefix_len == 0` branch. Absence of the KV is
the legacy convention for "no alignment claim", matching the
existing `test_target_prefix_len_zero_passes_through_as_single_rg`
test's `prefix_kv.is_none()` assertion.

New regression test
`test_target_prefix_len_zero_strips_stale_prefix_kv_from_input`:
inputs a 2-RG file with `qh.rg_partition_prefix_len = "1"` AND
opens through adapter with `target_prefix_len = 0`; asserts the
re-encoded output has no prefix KV. Pre-fix this test caught the
leak; post-fix the stale value is dropped.

487 lib tests pass on the slice; clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…onger test verifiers (#6426)

Three adversarial-review findings on the prefix/RG machinery, bundled
because they touch the same producer/consumer contract:

**F8: Legacy adapter rejects SS-1-violating input upfront.**
The adapter walked rows in physical order and emitted one RG per
prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`)
produced a 3-RG file where two RGs shared prefix `A`, violating PA-3.
The streaming merge engine would later reject it mid-merge — but only
after a quietly-bad file had been built. Now `compute_prefix_value_slices`
tracks each slice's composite prefix-value bytes and bails with
`LegacyAdapterError::InputNotSorted` on duplicates, surfacing the
SS-1 violation before any file lands on disk.

**F12: Consumer-side SS-3 (cross-layer divergence, discovered while
wiring F2's chunk-level verifier into the SS-3 test).** The adapter
implements SS-3 correctly (missing-from-schema → synthesized NullArray
during slice computation, file stamps `prefix_len = N`). The streaming
engine's reader did not: `find_prefix_parquet_col_indices` hard-required
every named prefix column to be physically present, so a file the
adapter produced from an SS-3 input was unreadable by the merge engine.
Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>`
and `extract_rg_composite_prefix_key` emits a constant null marker
(`encode_byte_array_prefix(&[])`) for None slots. The column contributes
no cross-RG ordering signal (constant everywhere) so region boundaries
are driven entirely by the present columns. Both halves of SS-3 now
agree end-to-end.

Known limitation: cross-file SS-3 — where some inputs have a sort
column and others don't — uses [0x00, 0x00] for the null contribution,
which sorts BEFORE non-null per the encoded-empty-string convention.
That weakly violates SS-2 (nulls sort last). Single-file SS-3 is
correct because every RG in such a file contributes the same constant.
If cross-file SS-3 becomes a production scenario, the encoding needs
a leading-0xff sentinel instead. Not exercised today.

**F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming
tests.** Tests asserting `num_row_groups == N` + KV stamped to N would
have passed even with an off-by-one in slice-boundary detection or
column-content scrambling. The verifier reads chunk-level statistics
directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness)
on the composite key. Wired into six tests:
- streaming engine: `test_streaming_merge_with_prefix_len_two`,
  `test_multi_rg_metric_aligned_input_produces_multi_rg_output`,
  `test_streaming_merge_with_desc_prefix_col`
- legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`,
  `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`,
  `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now
  passes thanks to F12).

Also: `assert_unique_rg_prefix_keys` no longer short-circuits on
single-RG files — they still go through PA-1 because an unsorted
single-RG file CAN have `min != max` on a prefix column.

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(merge): adapter rejects unsorted input; consumer honors SS-3; stronger test verifiers

Three adversarial-review findings on the prefix/RG machinery, bundled
because they touch the same producer/consumer contract:

**F8: Legacy adapter rejects SS-1-violating input upfront.**
The adapter walked rows in physical order and emitted one RG per
prefix-value run. An unsorted legacy input (rows `[A,A,B,B,A,A]`)
produced a 3-RG file where two RGs shared prefix `A`, violating PA-3.
The streaming merge engine would later reject it mid-merge — but only
after a quietly-bad file had been built. Now `compute_prefix_value_slices`
tracks each slice's composite prefix-value bytes and bails with
`LegacyAdapterError::InputNotSorted` on duplicates, surfacing the
SS-1 violation before any file lands on disk.

**F12: Consumer-side SS-3 (cross-layer divergence, discovered while
wiring F2's chunk-level verifier into the SS-3 test).** The adapter
implements SS-3 correctly (missing-from-schema → synthesized NullArray
during slice computation, file stamps `prefix_len = N`). The streaming
engine's reader did not: `find_prefix_parquet_col_indices` hard-required
every named prefix column to be physically present, so a file the
adapter produced from an SS-3 input was unreadable by the merge engine.
Now `find_prefix_parquet_col_indices` returns `Vec<Option<PrefixColumn>>`
and `extract_rg_composite_prefix_key` emits a constant null marker
(`encode_byte_array_prefix(&[])`) for None slots. The column contributes
no cross-RG ordering signal (constant everywhere) so region boundaries
are driven entirely by the present columns. Both halves of SS-3 now
agree end-to-end.

Known limitation: cross-file SS-3 — where some inputs have a sort
column and others don't — uses [0x00, 0x00] for the null contribution,
which sorts BEFORE non-null per the encoded-empty-string convention.
That weakly violates SS-2 (nulls sort last). Single-file SS-3 is
correct because every RG in such a file contributes the same constant.
If cross-file SS-3 becomes a production scenario, the encoding needs
a leading-0xff sentinel instead. Not exercised today.

**F2/F9/F11: Wire `assert_unique_rg_prefix_keys` into prefix-claiming
tests.** Tests asserting `num_row_groups == N` + KV stamped to N would
have passed even with an off-by-one in slice-boundary detection or
column-content scrambling. The verifier reads chunk-level statistics
directly: PA-1 (intra-RG `min == max`) + PA-3 (inter-RG uniqueness)
on the composite key. Wired into six tests:
- streaming engine: `test_streaming_merge_with_prefix_len_two`,
  `test_multi_rg_metric_aligned_input_produces_multi_rg_output`,
  `test_streaming_merge_with_desc_prefix_col`
- legacy adapter: `test_target_prefix_len_two_splits_by_metric_and_service`,
  `test_legacy_input_with_sort_fields_produces_prefix_aligned_multi_rg`,
  `test_missing_prefix_col_treated_as_null_satisfies_alignment` (now
  passes thanks to F12).

Also: `assert_unique_rg_prefix_keys` no longer short-circuits on
single-RG files — they still go through PA-1 because an unsorted
single-RG file CAN have `min != max` on a prefix column.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(merge): legacy-prefix promotion path + schema-evolution body cols

Two adversarial-review follow-ups grouped because they share the
streaming engine's input-routing and union-schema seams.

## (b) Legacy-prefix promotion

A new operation type pairs a prefix_len=0 split with prefix_len>0
peers in one merge, so legacy splits can be folded into prefix-
aligned buckets instead of aging out via retention. Adds:

- `ParquetMergeOperation::promote_legacy(splits, target_prefix_len)`: relaxes MP-3 to allow mixed
  `rg_partition_prefix_len` as long as every input is `<= target`. Sort_fields + window equality
  unchanged.
- `ParquetMergeOperation::target_prefix_len_override: Option<u32>` field records the promotion
  target; `None` is the default regular-merge form.
- `merge_parquet_split_metadata(..., mixed_prefix_ok)`: skips the input-side prefix-len equality
  check in promotion mode. The output prefix_len still comes from the writer's KV stamp via
  `MergeOutputFile.output_rg_partition_prefix_len` (CS-1 holds by construction post-F1).
- `merge::execute_merge_operation(op, sources, ...)`: new thin executor that opens each input as
  either `LegacyInputAdapter` (when `split.rg_partition_prefix_len < target`) or
  `StreamingParquetReader` (otherwise), then feeds them to the streaming engine. Becomes the seam
  PR-7 will wire from above.

Tests:
- `test_promote_legacy_pairs_legacy_with_aligned_peer`, `test_promote_legacy_rejects_higher_prefix_input`,
  `test_promote_legacy_still_enforces_sort_fields`, `test_promote_legacy_all_at_target_is_valid`.
- `test_mixed_prefix_ok_skips_input_equality_check`.
- `test_promote_legacy_executor_end_to_end`: legacy single-RG + aligned multi-RG → 3-RG output
  passing `assert_unique_rg_prefix_keys` with `prefix_len = 1`, plus metastore CS-1.
- `test_executor_mismatched_sources_count_bails`.

## F6 + F13: Schema evolution for body columns

The merger now supports MC-4 across heterogeneous body-col schemas:

- F6: `normalize_type` collapses `Binary`/`LargeBinary` (and dict variants) to `Binary`, analogous
  to the existing string-flavour collapse. Two inputs whose body col differs only by byte-array
  flavour merge cleanly; before this they hit a "type conflict" at alignment time.
- F13: `streaming_writer.rs::write_list_via_serialized_column_writer` (renamed from
  `..._non_nullable_...`) now handles nullable outer `List<T>` / `LargeList<T>`. MC-4 forces the
  union to be nullable when a List col is present in only some inputs; before this the writer
  rejected the merged output. Uses Dremel max_def_level = 2 (0 = outer null, 1 = empty list, 2 =
  element present) for nullable outer; non-nullable path unchanged.

Test: `test_mc2_mixed_schemas_round_trip` builds two inputs A and B
with the same sort schema but different body cols (Utf8 vs
Dict<Utf8>, LargeBinary vs Binary, List<Float64> in A only, Int32
A-only, Int64 B-only, common Float64). The merge produces the
union schema; per-row rendering via `render_cell` matches across
flavour boundaries; List cells from B render as nulls.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style(indexing): re-fmt parquet_merge_executor to latest nightly rustfmt

Same nightly-rustfmt drift as the storekey commit on #6424
(local nightly 2026-05-11 vs CI's 2026-05-17): the `mixed_prefix_ok`
binding and the `merge_parquet_split_metadata` call now fit
single-line under the newer width heuristics. No behaviour change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(merge-executor): route promotion merges through execute_merge_operation

Codex P1 on PR #6423: the executor unconditionally called the
in-memory `merge_sorted_parquet_files` path, which routes through
`extract_and_validate_input_metadata` and bails on mixed
`qh.rg_partition_prefix_len` before any output is produced. So a
real promotion merge — `prefix_len = 0` plus `prefix_len = 1` with
`target_prefix_len_override = Some(1)` — failed before reaching the
downstream `mixed_prefix_ok` plumbing in
`merge_parquet_split_metadata`. The escape hatch existed but was
unreachable for actual promotion inputs.

Fix: branch in the executor's handle on
`target_prefix_len_override.is_some()`. Promotion merges go through
the engine's streaming entry point
`quickwit_parquet_engine::merge::execute_merge_operation`, which
opens each below-target input via `LegacyInputAdapter` and each
at-target input directly. The streaming merge then sees a
homogeneous stream advertising `prefix_len = target` on every
input. Regular (non-promotion) merges keep the in-memory path.

`execute_merge_operation` expects `Vec<Arc<dyn RemoteByteSource>>`
parallel to `op.splits` — the engine deliberately doesn't depend
on `quickwit-storage` (would invert layering and pull cloud SDKs
into a pure parquet library). So this commit adds
`LocalFileByteSource`: a tiny `RemoteByteSource` impl backed by
`tokio::fs::File`, one instance per downloaded split, each bound
to its scratch-directory path. The `path: &Path` argument on the
trait surface is ignored — the downloader has already resolved
each split to a concrete local file before the executor runs.

Coverage:
- Library-level: `quickwit-parquet-engine::merge::streaming::tests::test_promote_legacy_executor_end_to_end`
  already exercises `execute_merge_operation` with a
  `prefix_len = 0` + `prefix_len = 1` pair, verifying the output
  advertises `prefix_len = 1` and passes PA-1 + PA-3 on the
  composite key. That's now the same code path the in-tree
  executor takes.
- Module doc on the executor rewritten to spell out which path runs
  when.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(adr): track legacy promotion planner gap as GAP-011

The streaming Parquet merge stack landing in #6424#6428 ships the
full legacy-promotion *mechanism* (engine + adapter + executor
wiring) but not the planner-level *trigger*. In production today,
`MergePolicyState::record_split` buckets by
`CompactionScope::from_split` which includes
`rg_partition_prefix_len`, so legacy (prefix=0) and aligned
(prefix>0) splits are separated before `ParquetMergePolicy::operations`
runs. The policy only emits `ParquetMergeOperation::new`; a
repo-wide search finds `promote_legacy` only in tests. Legacy
splits therefore never migrate without an explicit trigger.

Tracking this as GAP-011 so we pick it up at the right time. The
gap doc walks three resolution options (merge buckets in the scope
key, dedicated promotion pass, or hybrid prefer-multi-input-promotion)
and the cost trade-offs between them, so the eventual implementation
PR has a starting point.

Raised by Codex review comment id 4311184497 on PR #6423.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(adr): track download-vs-streaming merge executor gap as GAP-012

The Parquet streaming merge engine is built around `RemoteByteSource`
and was designed to pull pages directly from object storage — two
GETs per input, overlap fetch with merge, no scratch disk. The
production actor pipeline doesn't take that path: a downloader actor
materializes every input on local disk first, and the executor wraps
the local files in a `LocalFileByteSource` to feed `execute_merge_operation`
(or just calls the in-memory `merge_sorted_parquet_files` path). The
streaming engine's central design benefit is unused.

This isn't a correctness bug — both paths give the same result. It's
a perf/architecture gap: every merge pays 2× I/O per input
(network → scratch + scratch → merger), serializes phases
(`max(input download time)` first-byte latency), and consumes scratch
disk that scales with concurrent merges.

Tracking as GAP-012 so we pick it up at the right time. The gap doc
walks four options (stream-directly with download fallback, stream-
by-default with circuit breaker, eliminate in-memory path only,
stream-directly for promotion merges only) and the trade-offs between
them — including the mid-merge retry surface, which is the main
reason download-first is the current default.

Surfaced during PR #6423 code walkthrough.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…gion engine fix (#6428)

* feat(merge): close F4/F5/F7/F14 from the adversarial review

Three test additions plus one engine fix surfaced by the F4 tests.

The existing MS-7 test proved the per-input page-cache bound for one
input and one region. F4 extends the coverage:

- `test_ms7_per_input_bound_across_num_inputs` sweeps `num_inputs ∈ {1, 3, 8}` × `rows_per_input ∈
  {3 000, 30 000}` and asserts the per-input peak stays bounded. Cross-axis growth check: going
  from 1 input to 8 must not push the peak up.
- `test_ms7_per_input_bound_across_sub_regions_does_not_scale_with_rows` runs the prefix_len=0
  multi-output sub-region path at 3 000 vs 30 000 rows and asserts peak doesn't scale with input
  row count. **This test surfaced F14 (below) — without the engine fix, the sub-region path's peak
  grew ~9× when rows grew 10×.**

Tests serialize via `ms7_serial_lock` because
`PEAK_BODY_COL_PAGE_CACHE_LEN` is process-global; concurrent tests
would pollute each other's readings.

Parquet streams emit pages in column-major order (all of col 0,
then all of col 1, ...). The old sub-region-outer / col-inner
ordering meant that while processing sub-region 0's col K, the
stream emitted cols 0..K-1's remaining pages first to reach col K —
those skipped pages got cached under their own col_idx for later
sub-regions to consume, and the cache scaled with input row count.

Fix: new `process_split_region_col_outer` function for the
`needs_split` path. Cols iterate in the outer loop, sub-regions in
the inner. Each parquet col chunk is fully consumed from the stream
across all sub-regions before col K+1 starts. Cache for col K is
empty before col K+1's pages arrive.

Mechanics: pre-determine writer assignments for the region's
sub-regions (a top-level region's sub-regions may span multiple
output writers; consecutive sub-regions on the same writer get
coalesced into one combined Region so each writer holds one RG
concurrently — RGs on the same writer are sequential, so coalescing
keeps the parquet writer's single-active-RG constraint intact).
Single-region path stays on the existing `process_region`.

`prop_merge_prefix_aligned_streaming` sweeps `(num_inputs ∈ 1..=3,
per-input RG specs, num_outputs ∈ 1..=3)` with prefix_len=1 and
asserts MC-1 (rows preserved), MC-3 (sorted_series monotone within
each output), MS-3 (num_row_groups matches footer), PA-1+PA-3
(`assert_unique_rg_prefix_keys`), and CS-1 (metastore prefix_len ==
KV) on every generated case. 32 cases capped to keep runtime under
a second.

Fixture: `make_prefix_len_one_input` writes one RG per
`(metric_name, rows)` entry by calling `writer.flush()` between
batches. `sorted_series` encodes
`metric_base + row_offset_within_metric`, mirroring production's
storekey property that different metric_names produce
non-overlapping `sorted_series` byte ranges.

Plus a focused unit test `test_f5_single_input_two_metrics_minimal`
that pins one specific case for fast iteration.

`test_f7_production_shape_multi_input_multi_rg_multi_output`: 5
inputs × ~4 prefix-aligned RGs each × 4 outputs × prefix_len=1.
Asserts the full invariant bundle (MC-1, MS-3, PA-1+PA-3, MS-5
cross-output sorted_series monotonicity, CS-1) — the corner the
adversarial review flagged as "untested production case".

MS-5 is "across adjacent outputs, sorted_series is monotone
non-decreasing." A single metric CAN span outputs (the engine
splits at sorted_series transitions inside an overflowing region),
so the cross-output invariant is sorted_series monotonicity, not
"each metric in one output."

- `cargo test -p quickwit-parquet-engine --lib` — 498 unit tests pass.
- `cargo clippy -p quickwit-parquet-engine --tests --all-features` with `-Dwarnings`.
- `cargo doc --no-deps -p quickwit-parquet-engine` warning-free.
- `cargo fmt --all -- --check` (nightly via PATH override).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style(streaming): drop stray blank line before tests section header

Newer nightly rustfmt (2026-05-17) flags the extra blank line that
crept into the test module between the F4 fixture helper and the
"Heterogeneous-output regressions" section header. Single-line
gap is what nightly fmt wants.

No behaviour change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(streaming): roll over chunk-assignment before first chunk after split

Codex P1 on PR #6428: the previous "Recompute split budget after
rolling over" fix (commit 56e773f, #6424) handled the split
*decision* but not the split *assignment*. When the previous region
fills the current output exactly and the next region enters the
`needs_split` path, the chunk-assignment loop in
`process_split_region_col_outer`'s setup initializes from the stale
`current_output_idx` / `current_output_rows`. Its inner
`needs_new_writer` check guards on `!chunk_assignments.is_empty()`,
so the first iteration cannot roll over: the first sub-region is
appended to the already-full output and only the second one
advances. Output K ends up at 2× target while subsequent outputs
are short or empty.

Fix: initialize `active_output_idx` / `active_rows` from the
`will_roll_over` case before the loop. The inner `needs_new_writer`
check then works for both the first and subsequent iterations (on
the first iteration `active_rows = 0 < target` so it correctly
doesn't re-roll). The `can_reuse_current` check in the writer-
materialization loop already handles "first chunk's output_idx
doesn't match current_writer" by finalizing the current output
(which is correct: it's full, close it) and opening a fresh writer
at the next index.

Regression test
`test_split_chunk_assignment_rolls_over_before_first_chunk`:
prefix_len=1, two metrics of 200 + 400 rows = 600 total,
`num_outputs = 3` → `target_per_output = 200`. Region A fills
output 0 exactly; region B needs splitting. Pre-fix the merge
produced 2 outputs of 400 + 200 (output 0 overfilled, output 2
empty); post-fix it produces 3 outputs of ~200 rows each.

502 lib tests pass (+1); workspace clippy + nightly fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ming engine (default off, no-op rollout) (#6441)

* test(merge): engine-parity tests + share MS-7 serial lock

Adds `merge::tests::parity` with two tests that run the same realistic
input fixture through both `merge_sorted_parquet_files` (in-memory
engine) and `execute_merge_operation` (streaming engine over the same
`LocalFileByteSource` the executor uses in production), then asserts
row-by-row equivalence on every visible column. These gate the upcoming
YAML flag that flips regular merges to the streaming engine: parity
must hold before the default is flipped in production.

The streaming engine writes a process-global atomic
(`PEAK_BODY_COL_PAGE_CACHE_LEN`) that the MS-7 tests reset-then-read.
Any test that runs a streaming merge must serialise against MS-7 or
inflate its readings. Move `ms7_serial_lock` from the streaming-tests
submodule to module scope (still `#[cfg(test)] pub(crate)`) so the new
parity tests acquire the same lock.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(merge-executor): YAML flag to route regular merges through the streaming engine

Wires the streaming Parquet merge engine into the regular (non-promotion)
merge path behind a node-level YAML flag,
`indexer.parquet_merge_use_streaming_engine`, defaulted to false. When
true, `ParquetMergeExecutor::handle` runs every merge through
`execute_merge_operation` (the column-major, page-bounded streaming
engine) instead of the in-memory `merge_sorted_parquet_files`.
Promotion merges (`target_prefix_len_override.is_some()`) continue to
take the streaming path unconditionally — the in-memory engine can't
handle mixed `rg_partition_prefix_len` inputs.

The in-memory engine stays in place as the runtime fallback. If the
streaming engine hits a bug in production, an operator can flip the
flag back to `false` via YAML without redeploying. Once the streaming
path has soaked, the fallback branch and `merge_sorted_parquet_files`
itself can be removed.

The flag is plumbed `IndexerConfig` → `IndexingService` →
`ParquetMergePipelineParams` → `ParquetMergeExecutor::new`, and exercised
end-to-end by the engine parity tests in the previous commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(config): exercise parquet_merge_use_streaming_engine in YAML/JSON/TOML fixtures

Adds the new flag to the YAML, JSON, and TOML node-config test fixtures
and bumps the expected `IndexerConfig` in `node_config_parse_*` to
`parquet_merge_use_streaming_engine: true`. Catches parse / serde
regressions on the field — e.g., a rename or a default-fn typo would
fail the test instead of silently parsing as `false`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(merge-pipeline): end-to-end streaming-engine flag verification

Adds `test_merge_pipeline_end_to_end_with_streaming_engine_flag`, an
integration test that runs the full actor chain (planner → downloader
→ executor → uploader → publisher) with
`ParquetMergePipelineParams::use_streaming_engine = true`. Asserts:

1. Publish fired with the right replaced_split_ids (merge ran
   end-to-end through the executor).
2. `PEAK_BODY_COL_PAGE_CACHE_LEN > 0` after the merge. The streaming
   engine increments this on every body-col page assembly; the
   in-memory engine never touches it. Non-zero is direct evidence
   the streaming path executed — not a silent fallback to in-memory.
3. The merge output row count and metric names are correct.

To make assertion (2) work cross-crate, exposes
`PEAK_BODY_COL_PAGE_CACHE_LEN` as `pub` under
`#[cfg(any(test, feature = "testsuite"))]`. The visibility widening is
test-only — production builds never see the symbol.

This is the closest analog to the sesh-mode "production-path" rule
that is feasible today: the metrics pipeline's OTLP gRPC ingest path
is not yet wired into `quickwit-serve`, so the closest end-to-end
test is the actor-chain integration test that this PR adds.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(adr): record dual Parquet merge engines as deviation #1

Captures the intentional, time-bounded divergence from ADR-003 §4
introduced by the streaming-engine wire-in: two engines coexist in
production behind `IndexerConfig::parquet_merge_use_streaming_engine`,
with the in-memory engine retained as the runtime fallback.

Documents:
- The ADR-003 §4 quote the deviation diverges from (page-granular
  streaming, bounded memory).
- The current dual-engine implementation and routing logic.
- Why this exists (production safety, staged rollout, parity is
  strong-but-not-total).
- Explicit exit criteria: default flipped to `true`, ≥ 2-week
  production soak with no merge-correctness incidents, no rollback.
  When met, a follow-up PR deletes the in-memory branch and engine,
  the flag, and the parity tests.

This is the first deviation recorded under the EVOLUTION.md framework.
Indexes the doc in `deviations/README.md`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(merge-pipeline): share full correctness contract between both engine tests

Extracts the steps-5-through-8 assertions (replaced_split_ids, staged
metadata, Parquet file content, Parquet KV headers) into
`assert_cpu_mem_merge_outputs_correct` and calls it from both
`test_merge_pipeline_end_to_end` (in-memory engine) and
`test_merge_pipeline_end_to_end_with_streaming_engine_flag` (streaming
engine). The streaming-engine test had been doing only a small subset
of the checks — row count and metric names. It now runs the full
contract: time_range, num_merge_ops, sort_fields, row_keys_proto,
zonemap_regexes, low_cardinality_tags, all 100 timestamps,
sorted_series monotonicity, cpu/mem sort-order semantics, and every
`qh.*` Parquet KV header.

By construction both engines must produce a file that satisfies the
same contract — the helper is the executable parity between engines
at the pipeline-integration level, complementing the column-level
parity tests in `quickwit-parquet-engine::merge::tests::parity`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(merge-pipeline,merge-engine): multi-metric + multi-RG + m:n disjointness

Expands test coverage along three axes the existing helpers didn't hit:

1. **Multi-input, multi-metric pipeline tests** (new file
   `parquet_merge_pipeline_multi_metric_test.rs`). Three inputs, each
   carrying three metrics with overlapping per-metric timeseries IDs
   and overlapping-but-distinct timestamps — the merge must
   row-by-row interleave across all three inputs. Output writer uses
   `row_group_size = 50` so the 180-row merge output breaks into
   four row groups, exercising the writer's multi-RG path in both
   engines. Both engine variants (in-memory + streaming) covered.
   Streaming-engine test asserts `PEAK_BODY_COL_PAGE_CACHE_LEN > 0`
   to confirm the flag routed through the streaming path.

2. **Engine-level multi-output contract** in
   `merge::tests::parity::assert_engine_parity`. Beyond the existing
   engine-vs-engine column equivalence, every parity test now also
   verifies on the in-memory engine's outputs (equivalent to the
   streaming engine's): sum of per-output row counts equals total
   input rows, each output internally monotonic on `sorted_series`,
   and across outputs the partition is disjoint (no two outputs
   share any `sorted_series` value). This is the m:n
   non-overlap contract.

3. **Multi-metric overlapping-input m:n** test
   `parity_multi_metric_overlapping_inputs_multi_output` exercises
   the strengthened contract with three inputs × three metrics where
   per-metric keyspaces overlap across inputs. n = 3 outputs target.

Honest scope note in the new pipeline test module's doc: the actor
pipeline today hardcodes `num_outputs = 1` in `ParquetMergeExecutor`,
so n > 1 is not reachable end-to-end through the actor system. The
new engine-level test covers the n > 1 correctness contract for now;
when the executor is taught to accept `num_outputs > 1` from the
merge policy, the pipeline tests can grow an n > 1 variant.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(merge-executor): compute num_outputs from target_split_size_bytes

Replaces the hardcoded `MergeConfig { num_outputs: 1, ... }` in
`ParquetMergeExecutor::handle` with a per-merge computation:

    num_outputs = max(1, ceil(total_input_bytes / target_split_size_bytes))

So a merge that ingests more than one target's worth of data spreads
across multiple output files; merges that fit in one target keep
producing a single output (preserving today's behavior for the
common case). The engine clamps the request to the number of
`sorted_series` boundaries actually available, so the value is an
upper bound, not an exact count.

Plumbing: `IndexerConfig` already carries `target_split_size_bytes`
in `ParquetMergePolicyConfig`. Pass that through
`ParquetMergePipelineParams.target_split_size_bytes` →
`ParquetMergeExecutor::new`. Default for tests:
`256 * 1024 * 1024` (matches the production default).

Latent multi-output bug fixed at the same time: with n>1, the
executor used to assign the planner-supplied `merge_split_id` to
**every** output split, which would have collided on the rename to
`{split_id}.parquet`. First output keeps the planner ID for
observability continuity; subsequent outputs use the fresh IDs
generated by `merge_parquet_split_metadata`.

Also exposes `quickwit_parquet_engine::merge::streaming::ms7_serial_lock`
as `pub` under the `testsuite` feature so cross-crate streaming tests
(in `quickwit-indexing`) can serialise against the same global lock
the in-crate MS-7 tests use. The streaming engine writes to a
process-global atomic on every merge — without shared locking, the
existing pipeline streaming-engine test races `store(0)` against
other tests' merges. Adds the appropriate
`#[allow(clippy::await_holding_lock)]` to the in-crate
`test_merge_pipeline_end_to_end_with_streaming_engine_flag` to
match.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test(merge-pipeline): bonus — prefix_len=1 multi-RG inputs + m:n outputs

Adds the bonus scenario: three multi-metric inputs each written with
`rg_partition_prefix_len = 1` and one row group per distinct
metric_name (via `row_group_size = ROWS_PER_METRIC_PER_INPUT` so the
writer flushes at every metric boundary after sorting). Merged with a
small `ParquetMergePipelineParams::target_split_size_bytes = 500`
that forces the executor's `num_outputs` calculation to ask the
engine for multiple outputs — exercising the m:n merge path now
reachable through the actor pipeline (PR's earlier commit removed
the `num_outputs = 1` hardcode).

Both engines covered:

- `test_prefix_aligned_multi_metric_three_input_multi_output_in_memory_engine`
- `test_prefix_aligned_multi_metric_three_input_multi_output_streaming_engine`

The streaming-engine variant also asserts
`PEAK_BODY_COL_PAGE_CACHE_LEN > 0` (under `ms7_serial_lock`) so a
silent fallback to the in-memory path would fail.

The shared assertion helper
`assert_three_input_three_metric_multi_output_correct` checks the
m:n contract end-to-end at the pipeline level:

- All three input splits replaced.
- ≥ 2 output splits staged (proves splitting happened).
- Sum of per-output row counts = total input rows.
- Each output internally monotonic on `sorted_series`.
- Across outputs, the `sorted_series` partition is disjoint — no
  two outputs share any key, which is the "non-overlapping output"
  contract the engine promises.
- Union of metric_names / services across outputs = full input set.
- Every output has `num_merge_ops = 1`, `row_keys_proto`, and a
  `metric_name` zonemap regex.

To pin the test to exactly one merge (not a cascade of merges over
the now-multiple staged outputs), `make_pipeline_params` now takes
`max_merge_ops` and the bonus tests set it to `1`: outputs land at
`num_merge_ops = 1`, equal to the policy ceiling, and the planner
refuses to merge them again. The existing n=1 tests stay at 5
(headroom — they produce a single output that can't trigger another
merge anyway, since `merge_factor = 3`).

Updates the module doc to drop the now-stale scope note about m:n
not being reachable through the pipeline.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* style: nightly rustfmt + drop useless borrows in assert!

Reformats doc comments / format strings under nightly rustfmt
(`wrap_comments`, `format_strings`), and removes two redundant `&` in
`assert!` arguments flagged by clippy.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/parquet-streaming-base branch from 81983ce to 7196e8e Compare May 20, 2026 14:10
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7196e8e788

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

for (split, source) in op.splits.iter().zip(sources) {
let path = PathBuf::from(&split.parquet_file);
let stream: Box<dyn ColumnPageStream> = match op.target_prefix_len_override {
Some(target) if split.rg_partition_prefix_len < target => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Adapt target-0 legacy promotion inputs

When a promotion operation uses target_prefix_len_override = Some(0) (which promote_legacy accepts for all prefix-0 inputs, and LegacyInputAdapter documents as the fallback when there is no non-legacy peer), this guard is false for every legacy split, so multi-row-group prefix-0 inputs are opened directly. streaming_merge_sorted_parquet_files then rejects raw prefix-0 multi-RG streams, so all-legacy promotion jobs fail instead of being normalized through the adapter. Fresh evidence beyond the existing regular-merge comment is that the same branch also breaks target-0 promotion operations.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

@trinity-1686a trinity-1686a left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay for the few files quickwit-core has exclusive ownership

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants