Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions docs/internals/adr/deviations/001-dual-parquet-merge-engines.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# Deviation 001: Dual Parquet merge engines during streaming-engine rollout

## Summary

Two Parquet merge engines coexist in production behind a runtime YAML flag.
The streaming engine (`execute_merge_operation`) matches the intent of
[ADR-003 §4](../003-time-windowed-sorted-compaction.md) (page-granular
streaming, bounded memory). The in-memory engine
(`merge_sorted_parquet_files`) is retained as the runtime fallback so an
operator can flip back via configuration if the streaming engine hits a
production bug. The dual-engine state is intentional and time-bounded —
it ends when the streaming engine has soaked at the new default in
production.

## Related ADR

- **ADR**: [ADR-003 Time-Windowed Sorted Compaction](../003-time-windowed-sorted-compaction.md)
- **Section**: §4 Sorted Merge, Phase 2 (column streaming)

## ADR States

> Phase 2: Stream columns through the merge.
>
> Once the global sort order is determined, each column is read from the
> input splits and written to the output in sorted order. Columns are
> processed one at a time (or in small groups) for memory efficiency.
>
> For large columns, it may be advantageous to operate at **page
> granularity** rather than loading an entire column from each input:
> read individual Parquet pages from inputs as needed and write
> individual pages to the output. This bounds memory usage for columns
> with large values (e.g., high-cardinality string tags, large attribute
> maps) and avoids materializing an entire column across all inputs
> simultaneously.

ADR-003 §4 describes the merge as a streaming operation that bounds
memory by reading and writing pages incrementally. The in-memory
`merge_sorted_parquet_files` engine pre-materializes whole columns from
all inputs simultaneously — directly contrary to the ADR's stated
memory model.

## Current Implementation

Both engines live in `quickwit-parquet-engine/src/merge/`:

- **Streaming engine** (`execute_merge_operation`, in `merge/mod.rs`,
backed by `merge/streaming.rs`). Column-major, page-bounded body cache,
reads inputs through `RemoteByteSource`. This is the
ADR-003-compliant implementation. It is the unconditional path for
promotion merges (the in-memory path can't handle mixed
`rg_partition_prefix_len`) and the opt-in path for regular merges.
- **In-memory engine** (`merge_sorted_parquet_files`, in `merge/mod.rs`).
Buffers all inputs through arrow-rs into memory, runs the merge under
`run_cpu_intensive`. This is the original bootstrap implementation
retained as the runtime fallback.

`ParquetMergeExecutor::handle` routes between them:

```rust
let is_promotion = scratch.merge_operation.target_prefix_len_override.is_some();
if is_promotion || self.use_streaming_engine {
execute_merge_operation(&op, sources, &output_dir, &config).await
} else {
run_cpu_intensive(move || {
merge_sorted_parquet_files(&input_paths, &output_dir_clone, &config)
}).await
}
```

The `use_streaming_engine` boolean is sourced from the node-level
`IndexerConfig::parquet_merge_use_streaming_engine` YAML field, default
`false`.

Row-content equivalence between the two engines is enforced by parity
tests in `quickwit-parquet-engine/src/merge/tests.rs::parity`. These
must keep passing as long as both engines coexist.

## Signal Impact

Applies to **metrics** (the only signal currently using the Parquet
pipeline). Will apply to **traces** and **logs** when those signals
adopt Parquet splits. The deviation does not affect Tantivy-backed
pipelines.

## Impact

| Aspect | ADR Target | Current Reality |
|--------|------------|-----------------|
| Engines in production | One streaming engine | Two (streaming + in-memory) |
| Memory model | Page-bounded; ~constant per column | In-memory engine: O(total input column size) per merge |
| Configuration surface | None — engine choice is internal | One YAML flag (`parquet_merge_use_streaming_engine`) |
| Code surface to maintain | One engine | Two engines + parity tests + routing branch |
| Operator rollback | Not applicable — only one path | Flip flag to `false`, no redeploy needed |

## Why This Exists

The streaming engine is new code. ADR-003 describes the target memory
model but does not guarantee bug-free first-deployment behavior. Three
forces produced the dual-engine state:

1. **Production safety.** The in-memory engine has been the live merge
path during the metrics pipeline's bring-up. Replacing it wholesale
on a single PR, without an in-place fallback, would mean any bug in
the streaming engine requires a redeploy to recover. With the flag,
recovery is `config edit + restart`.
2. **Staged rollout.** Production confidence is built by enabling the
streaming engine on a soak fleet, observing for some time, then
flipping the default. The dual-engine state is the necessary
infrastructure for that rollout.
3. **Parity verifiable, not certain.** The parity tests in
`merge::tests::parity` cover representative synthetic fixtures.
Production data has shapes those fixtures don't cover. The fallback
exists because parity is a strong-but-not-total guarantee.

## Priority Assessment

- **PoC / MVP**: acceptable — dual-engine is in fact the deliberate
state during MVP rollout.
- **Production (current)**: acceptable — flag defaults to `false`,
rollout has not begun. The streaming engine is exercised only by
promotion merges (whose execution will start once GAP-011 is closed).
- **Production (post-soak)**: not acceptable. Once the streaming engine
has soaked at default-`true` in production, the in-memory engine
becomes dead code that complicates the merge-executor and obscures
the ADR-003 memory contract. Resolve before merging additional
significant work into `parquet_merge_executor.rs`.

## Exit Criteria

The deviation resolves when **all** of the following hold:

1. `IndexerConfig::default_parquet_merge_use_streaming_engine` defaults
to `true` in `quickwit-config`.
2. At least one production fleet has run with the flag set to `true` for
a soak window of ≥ 2 weeks with no merge-correctness incidents (no
data loss, no schema mismatch, no merge-output-rows-≠-input-rows
alerts).
3. No deviation-resolving rollback has been issued during the soak.

When those are met, the follow-up PR:

- Deletes `merge_sorted_parquet_files` from `quickwit-parquet-engine`.
- Deletes the in-memory branch in `ParquetMergeExecutor::handle`.
- Deletes the `use_streaming_engine` field on `ParquetMergeExecutor` and
`ParquetMergePipelineParams`.
- Deletes `IndexerConfig::parquet_merge_use_streaming_engine`.
- Deletes `merge::tests::parity` (both engines no longer exist to
compare).
- Closes this deviation.

## Work Required to Match ADR

| Change | Difficulty | Description |
|--------|------------|-------------|
| Flip default to `true` | Trivial | One-line change in `IndexerConfig::default_parquet_merge_use_streaming_engine`. Lands after soak. |
| Production soak | Operational | Run with `true` on at least one fleet for ≥ 2 weeks, monitor merge-correctness signals. |
| Delete in-memory engine | Moderate | Remove `merge_sorted_parquet_files`, the fallback branch, the flag, and the parity tests. Mechanically straightforward but touches several call sites. |

## Recommendation

**Accept for now.** The dual-engine state is the deliberate output of a
flag-with-fallback rollout pattern (see commit history of #6441 and
related PRs). Resolution is a known follow-up, not technical debt that
needs to be paid down ahead of schedule.

Track the exit criteria in this doc. When all three conditions hold,
open the deletion PR and close this deviation.

## References

- [ADR-003 Time-Windowed Sorted Compaction](../003-time-windowed-sorted-compaction.md) §4
- [GAP-011 No legacy promotion planner](../gaps/011-no-legacy-promotion-planner.md)
- [GAP-012 Merge downloads instead of streaming](../gaps/012-merge-downloads-instead-of-streaming.md)
- PR #6441 (wire-in of the YAML flag)

## Date

2026-05-18
3 changes: 1 addition & 2 deletions docs/internals/adr/deviations/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ Deviation files use sequential numbering: `001-short-description.md`

| Deviation | Title | Related ADR | Priority |
|-----------|-------|-------------|----------|

*No deviations recorded yet.*
| [001](./001-dual-parquet-merge-engines.md) | Dual Parquet merge engines during streaming-engine rollout | [ADR-003](../003-time-windowed-sorted-compaction.md) | Accept until post-soak |

## Lifecycle

Expand Down
136 changes: 136 additions & 0 deletions docs/internals/adr/gaps/011-no-legacy-promotion-planner.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# GAP-011: No Planner-Level Legacy Promotion

**Status**: Open
**Discovered**: 2026-05-18
**Context**: Codex review on PR #6423 (`feat(merge): legacy promotion path + body-col schema evolution`) flagged that the promotion path is wired end-to-end at the library + executor layer but has no production trigger at the planner / policy level.

## Problem

The streaming Parquet merge stack now contains a complete *legacy promotion* pipeline:

- `ParquetMergeOperation::promote_legacy(splits, target_prefix_len)` constructs an operation with
`target_prefix_len_override = Some(target)`.
- `merge::execute_merge_operation` routes each input through `LegacyInputAdapter` when its
declared `rg_partition_prefix_len < target` and through `StreamingParquetReader` otherwise. The
streaming engine then sees a homogeneous stream advertising `prefix_len = target` on every
input.
- `ParquetMergeExecutor` (in `quickwit-indexing`) detects `target_prefix_len_override.is_some()`
and routes those merges through `execute_merge_operation` (with `LocalFileByteSource`) instead
of the in-memory `merge_sorted_parquet_files` path.
- `merge_parquet_split_metadata` accepts a `mixed_prefix_ok: bool` flag so the post-merge
aggregator skips the input-side equality check.

What's missing: **nothing in the planner ever creates a `promote_legacy` operation in
production**. `MergePolicyState::record_split` buckets each split by
`CompactionScope::from_split`, and that scope key includes `rg_partition_prefix_len`. Legacy
splits (`prefix_len = 0`) and aligned splits (`prefix_len > 0`) therefore land in *different*
buckets before `ParquetMergePolicy::operations` ever runs. The production policy then iterates
each bucket independently and emits `ParquetMergeOperation::new` (regular merge). A repo-wide
search finds `promote_legacy` only in tests.

In a mixed deployment (legacy + aligned splits coexisting), legacy splits therefore stay in
their `prefix_len = 0` bucket forever — never gaining the prefix alignment that downstream
locality compaction depends on. The promotion plumbing is reachable only from tests.

## Evidence

- `quickwit-parquet-engine/src/merge/policy/mod.rs`: `ParquetMergePolicy::operations` calls
`ParquetMergeOperation::new(...)` only. `promote_legacy` is constructed only by tests in the
same file.
- `MergePolicyState::record_split` keys its `BTreeMap` by `CompactionScope::from_split`. The
scope derivation includes `rg_partition_prefix_len`, so a legacy split and a prefix-aligned
split with otherwise identical sort fields / window / merge level are never compared by the
policy.
- The executor branch added in PR #6423 (`scratch.merge_operation.target_prefix_len_override
.is_some()`) routes promotion through `execute_merge_operation`. Library coverage at
`test_promote_legacy_executor_end_to_end` exercises a `prefix_len = 0` + `prefix_len = 1` pair
successfully. But that operation is only ever constructed inside the test.

## State of the Art

- **Iceberg**: Compaction policies inspect file-level metadata (partitioning, sort order) and
can rewrite files to align with the latest table partitioning even when individual files
pre-date the change. The compaction service treats schema-evolution-style rewrites as
first-class operations.
- **Husky**: Background re-organization passes that promote files into newer storage layouts.
Tracked separately from the size-tiered compaction policy so cost trade-offs can be tuned.

In both cases, the design separates the *trigger* (decision to promote) from the *mechanism*
(how the promotion is performed). Quickwit currently has the mechanism but not the trigger.

## Potential Solutions

### Option A: Merge legacy + aligned buckets in `CompactionScope::from_split`

Drop `rg_partition_prefix_len` from the scope key (or normalize it to a target value before
bucketing). The policy then sees legacy and aligned splits as candidates for the same
compaction operation and `ParquetMergePolicy::operations` decides whether to emit a regular
merge or a `promote_legacy` operation based on whether the bucket contains mixed prefix
lengths.

Simplest change, but requires the policy to detect mixed-prefix buckets and choose between
`new` and `promote_legacy` per operation.

### Option B: Dedicated promotion pass

Run a separate pass before the regular compaction policy that scans for legacy splits and emits
`promote_legacy` operations for them. The regular policy then sees only aligned splits.

Cleaner separation of concerns, but means legacy splits are migrated *before* any opportunity
to coalesce them with aligned neighbors in a single multi-input merge — possibly more work
overall.

### Option C: Hybrid — bucket together, prefer single-pass promotion

Keep scope bucketing as in option A. Inside the policy, when a bucket contains mixed prefix
lengths AND has enough splits to merit a multi-input merge, emit `promote_legacy`. When only
legacy splits exist (no aligned neighbor), emit `promote_legacy` with the same target — single-
input promotion is still valuable because it converts the file to the new format for future
locality compaction.

Most flexible; gives the policy the freedom to amortize promotion cost when there are aligned
neighbors AND to still promote isolated legacy splits in the background.

## Signal Impact

Primarily affects **metrics** in the near term: the legacy split format pre-dates the
prefix-aligned RG layout, and only metrics has both formats in flight today. Traces and logs
on the Parquet path will eventually reach the same state if a layout change ever happens; the
same planner machinery would cover them.

## Cost Considerations

Promotion is strictly more expensive than a regular merge: the legacy adapter buffers the full
input file in memory and re-encodes it as a single-RG stream before the merge engine sees it.
For 50 MB metrics splits this is acceptable; for larger inputs the in-memory buffer is the
gating cost.

The planner should account for this when scheduling — promotion is best amortized into a
multi-input merge rather than performed as a standalone file rewrite. Option C's "prefer
multi-input promotion, fall back to single-input" structure captures this.

## Impact

- **Severity**: Medium. Legacy splits accumulate cost (every query against them pays the
prefix-less scan cost) but correctness is preserved — the locality compaction stack still
works on aligned splits.
- **Frequency**: Persistent. Legacy splits never migrate without an explicit trigger.
- **Affected Areas**: `quickwit-parquet-engine/src/merge/policy/`, `quickwit-parquet-engine/src/merge/mod.rs` (`MergePolicyState::record_split` + `CompactionScope`).

## Next Steps

- [ ] Decide between options A / B / C based on operational priorities and benchmark data.
- [ ] Design the policy-level "should promote?" heuristic: how many legacy splits before
triggering, whether to wait for aligned neighbors, how to deprioritize promotion vs
regular compaction.
- [ ] Add metrics for `legacy_splits_pending_promotion` and `promotion_operations_emitted` so we
can observe the policy in production.
- [ ] Wire whichever option is chosen, with an integration test that exercises the full path
(legacy split → planner → executor → published prefix-aligned split).

## References

- PR #6423 (legacy promotion path + body-col schema evolution).
- Codex review comment id `4311184497` (raised the gap).
- `test_promote_legacy_executor_end_to_end` in `quickwit-parquet-engine::merge::streaming` —
library-level coverage of the mechanism.
Loading
Loading