diff --git a/quickwit/quickwit-compaction/src/planner/compaction_planner.rs b/quickwit/quickwit-compaction/src/planner/compaction_planner.rs index 637fe72017b..ff46948e429 100644 --- a/quickwit/quickwit-compaction/src/planner/compaction_planner.rs +++ b/quickwit/quickwit-compaction/src/planner/compaction_planner.rs @@ -27,7 +27,7 @@ use quickwit_proto::compaction::{ CompactionResult, MergeTaskAssignment, ReportStatusRequest, ReportStatusResponse, }; use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService, MetastoreServiceClient}; -use quickwit_proto::types::{IndexUid, NodeId, SourceId}; +use quickwit_proto::types::NodeId; use time::OffsetDateTime; use tracing::{error, info}; use ulid::Ulid; @@ -190,20 +190,13 @@ impl CompactionPlanner { let pending = self.state.pop_pending(available_slots as usize); let mut assignments = Vec::with_capacity(pending.len()); - for (partition_key, operation) in pending { + for operation in pending { let task_id = Ulid::new().to_string(); - let Some(index_entry) = self.index_config_metastore.get(&partition_key.index_uid) - else { - error!(index_uid=%partition_key.index_uid, "index config not found for pending operation, skipping"); + let Some(index_entry) = self.index_config_metastore.get(&operation.index_uid) else { + error!(index_uid=%operation.index_uid, "index config not found for pending operation, skipping"); continue; }; - let assignment = build_task_assignment( - &task_id, - index_entry, - &operation, - &partition_key.index_uid, - &partition_key.source_id, - ); + let assignment = build_task_assignment(&task_id, index_entry, &operation); let split_ids = operation .splits_as_slice() @@ -237,8 +230,6 @@ fn build_task_assignment( task_id: &str, index_entry: &IndexEntry, operation: &MergeOperation, - index_uid: &IndexUid, - source_id: &SourceId, ) -> MergeTaskAssignment { MergeTaskAssignment { task_id: task_id.to_string(), @@ -253,8 +244,8 @@ fn build_task_assignment( search_settings_json: index_entry.search_settings_json(), indexing_settings_json: index_entry.indexing_settings_json(), retention_policy_json: index_entry.retention_policy_json(), - index_uid: Some(index_uid.clone()), - source_id: source_id.to_string(), + index_uid: Some(operation.index_uid.clone()), + source_id: operation.source_id.clone(), index_storage_uri: index_entry.index_storage_uri(), merge_level: operation.merge_level() as u64, } diff --git a/quickwit/quickwit-compaction/src/planner/compaction_state.rs b/quickwit/quickwit-compaction/src/planner/compaction_state.rs index 58cf85a5bb3..6f9dc2069c7 100644 --- a/quickwit/quickwit-compaction/src/planner/compaction_state.rs +++ b/quickwit/quickwit-compaction/src/planner/compaction_state.rs @@ -51,7 +51,6 @@ impl CompactionPartitionKey { #[derive(Debug)] struct InFlightCompaction { - task_id: TaskId, split_ids: Vec, node_id: NodeId, last_heartbeat: Instant, @@ -66,8 +65,6 @@ pub struct CompactionState { needs_compaction_split_ids: HashSet, in_flight: HashMap, in_flight_split_ids: HashSet, - /// TODO: add index_uid and source_id to MergeOperation so we don't need the partition key - /// here. pending_operations: PendingOperations, } @@ -123,8 +120,7 @@ impl CompactionState { self.in_flight_split_ids .insert(split.split_id().to_string()); } - self.pending_operations - .push(partition_key.clone(), operation); + self.pending_operations.push(operation); } } if splits.is_empty() { @@ -170,7 +166,6 @@ impl CompactionState { self.in_flight.insert( task.task_id.clone(), InFlightCompaction { - task_id: task.task_id.clone(), split_ids: task.split_ids.clone(), node_id: node_id.clone(), last_heartbeat: Instant::now(), @@ -201,8 +196,8 @@ impl CompactionState { } } - /// Pops up to `count` pending operations for assignment. - pub fn pop_pending(&mut self, count: usize) -> Vec<(CompactionPartitionKey, MergeOperation)> { + /// Pops up to `count` pending operations for assignment, highest-score first. + pub fn pop_pending(&mut self, count: usize) -> Vec { let count = count.min(self.pending_operations.len()); let mut operations = Vec::with_capacity(count); for _ in 0..count { @@ -214,9 +209,8 @@ impl CompactionState { /// Records that an operation has been assigned to a worker. pub fn record_assignment(&mut self, task_id: TaskId, split_ids: Vec, node_id: NodeId) { self.in_flight.insert( - task_id.clone(), + task_id, InFlightCompaction { - task_id, split_ids, node_id, last_heartbeat: Instant::now(), @@ -330,7 +324,7 @@ mod tests { // Splits moved from needs_compaction to in_flight. assert!(!state.pending_operations.is_empty()); - for (_, op) in state.pending_operations.iter() { + for op in state.pending_operations.iter() { for split in op.splits_as_slice() { assert!(!state.needs_compaction_split_ids.contains(split.split_id())); assert!(state.in_flight_split_ids.contains(split.split_id())); @@ -413,7 +407,7 @@ mod tests { let pending = state.pop_pending(1); assert_eq!(pending.len(), 1); - let (_, operation) = &pending[0]; + let operation = &pending[0]; let split_ids: Vec = operation .splits_as_slice() .iter() diff --git a/quickwit/quickwit-compaction/src/planner/mod.rs b/quickwit/quickwit-compaction/src/planner/mod.rs index 0fe49bd190e..18b408580d3 100644 --- a/quickwit/quickwit-compaction/src/planner/mod.rs +++ b/quickwit/quickwit-compaction/src/planner/mod.rs @@ -13,45 +13,43 @@ // limitations under the License. mod compaction_planner; -#[allow(dead_code)] mod compaction_state; -#[allow(dead_code)] mod index_config_metastore; pub(crate) mod metrics; -use std::collections::VecDeque; +use std::collections::BinaryHeap; pub use compaction_planner::CompactionPlanner; use quickwit_indexing::merge_policy::MergeOperation; -use crate::planner::compaction_state::CompactionPartitionKey; use crate::planner::metrics::COMPACTION_PLANNER_METRICS; use crate::source_uid_metrics_label; -/// Queue of merge operations awaiting assignment, with the -/// `pending_merge_operations` gauge maintained inline. Push/pop are the only -/// mutation paths so the metric stays consistent with `len()`. +/// Max-heap of merge operations awaiting assignment, ordered by +/// `MergeOperation`'s score-based `Ord`. The `pending_merge_operations` gauge +/// is maintained inline; push/pop are the only mutation paths so the metric +/// stays consistent with `len()`. #[derive(Debug)] struct PendingOperations { - inner: VecDeque<(CompactionPartitionKey, MergeOperation)>, + inner: BinaryHeap, } impl PendingOperations { fn new() -> Self { Self { - inner: VecDeque::new(), + inner: BinaryHeap::new(), } } - fn push(&mut self, partition_key: CompactionPartitionKey, operation: MergeOperation) { - Self::adjust_gauge(&partition_key, &operation, 1); - self.inner.push_back((partition_key, operation)); + fn push(&mut self, operation: MergeOperation) { + Self::adjust_gauge(&operation, 1); + self.inner.push(operation); } - fn pop(&mut self) -> Option<(CompactionPartitionKey, MergeOperation)> { - let (partition_key, operation) = self.inner.pop_front()?; - Self::adjust_gauge(&partition_key, &operation, -1); - Some((partition_key, operation)) + fn pop(&mut self) -> Option { + let operation = self.inner.pop()?; + Self::adjust_gauge(&operation, -1); + Some(operation) } fn len(&self) -> usize { @@ -64,17 +62,12 @@ impl PendingOperations { } #[cfg(test)] - fn iter(&self) -> impl Iterator { + fn iter(&self) -> impl Iterator { self.inner.iter() } - fn adjust_gauge( - partition_key: &CompactionPartitionKey, - operation: &MergeOperation, - delta: i64, - ) { - let source_uid_label = - source_uid_metrics_label(&partition_key.index_uid, &partition_key.source_id); + fn adjust_gauge(operation: &MergeOperation, delta: i64) { + let source_uid_label = source_uid_metrics_label(&operation.index_uid, &operation.source_id); let merge_level = operation.merge_level().to_string(); COMPACTION_PLANNER_METRICS .pending_merge_operations diff --git a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs index bbe5267d514..fcdbad2c815 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs @@ -209,30 +209,12 @@ struct ScheduleMerge { split_downloader_mailbox: Mailbox, } -/// The higher, the sooner we will execute the merge operation. -/// A good merge operation -/// - strongly reduces the number splits -/// - is light. -fn score_merge_operation(merge_operation: &MergeOperation) -> u64 { - let total_num_bytes: u64 = merge_operation.total_num_bytes(); - if total_num_bytes == 0 { - // Silly corner case that should never happen. - return u64::MAX; - } - // We will remove splits.len() and add 1 merge splits. - let delta_num_splits = (merge_operation.splits.len() - 1) as u64; - // We use integer arithmetic to avoid `f64 are not ordered` silliness. - (delta_num_splits << 48) - .checked_div(total_num_bytes) - .unwrap_or(1u64) -} - impl ScheduleMerge { pub fn new( merge_operation: TrackedObject, split_downloader_mailbox: Mailbox, ) -> ScheduleMerge { - let score = score_merge_operation(&merge_operation); + let score = merge_operation.score; ScheduleMerge { score, merge_operation, @@ -315,24 +297,6 @@ mod tests { MergeOperation::new_merge_operation(splits) } - #[test] - fn test_score_merge_operation() { - let score_merge_operation_aux = |num_splits, num_bytes_per_split| { - let merge_operation = build_merge_operation(num_splits, num_bytes_per_split); - score_merge_operation(&merge_operation) - }; - assert!(score_merge_operation_aux(10, 10_000_000) < score_merge_operation_aux(10, 999_999)); - assert!( - score_merge_operation_aux(10, 10_000_000) > score_merge_operation_aux(9, 10_000_000) - ); - assert_eq!( - // 9 - 1 = 8 splits removed. - score_merge_operation_aux(9, 10_000_000), - // 5 - 1 = 4 splits removed. - score_merge_operation_aux(5, 10_000_000 * 9 / 10) - ); - } - #[tokio::test] async fn test_merge_schedule_service_prioritize() { let universe = Universe::new(); diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 37bbf15edbc..f950c30635c 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -26,7 +26,7 @@ pub use nop_merge_policy::NopMergePolicy; use quickwit_config::IndexingSettings; use quickwit_config::merge_policy_config::MergePolicyConfig; use quickwit_metastore::{SplitMaturity, SplitMetadata}; -use quickwit_proto::types::SplitId; +use quickwit_proto::types::{IndexUid, SourceId, SplitId}; use serde::Serialize; pub(crate) use stable_log_merge_policy::StableLogMergePolicy; use tantivy::TrackedObject; @@ -83,20 +83,35 @@ pub struct MergeOperation { #[serde(skip_serializing)] pub merge_parent_span: Span, pub merge_split_id: SplitId, + pub index_uid: IndexUid, + pub source_id: SourceId, pub splits: Vec, pub operation_type: MergeOperationType, + /// Priority score, computed once at construction. Higher = run sooner. + /// `Ord`/`Eq` for `MergeOperation` are defined purely on this field so a + /// `BinaryHeap` is a max-heap by score. + pub score: u64, } impl MergeOperation { + /// All splits must belong to the same `(index_uid, source_id)` — a precondition + /// merge policies already satisfy because they partition before merging. pub fn new_merge_operation(splits: Vec) -> Self { + let first_split = splits.first().expect("merge operation must have splits"); + let index_uid = first_split.index_uid.clone(); + let source_id = first_split.source_id.clone(); let merge_split_id = new_split_id(); let split_ids = splits.iter().map(|split| split.split_id()).collect_vec(); let merge_parent_span = info_span!("merge", merge_split_id=%merge_split_id, split_ids=?split_ids, typ=%MergeOperationType::Merge); + let score = compute_score(&splits); Self { merge_parent_span, merge_split_id, + index_uid, + source_id, splits, operation_type: MergeOperationType::Merge, + score, } } @@ -108,13 +123,20 @@ impl MergeOperation { } pub fn new_delete_and_merge_operation(split: SplitMetadata) -> Self { + let index_uid = split.index_uid.clone(); + let source_id = split.source_id.clone(); let merge_split_id = new_split_id(); let merge_parent_span = info_span!("delete", merge_split_id=%merge_split_id, split_ids=?split.split_id(), typ=%MergeOperationType::DeleteAndMerge); + let splits = vec![split]; + let score = compute_score(&splits); Self { merge_parent_span, merge_split_id, - splits: vec![split], + index_uid, + source_id, + splits, operation_type: MergeOperationType::DeleteAndMerge, + score, } } @@ -131,6 +153,49 @@ impl MergeOperation { } } +/// The higher, the sooner we will execute the merge operation. +/// A good merge operation: +/// - strongly reduces the number of splits +/// - is light. +fn compute_score(splits: &[SplitMetadata]) -> u64 { + let total_num_bytes: u64 = splits.iter().map(|split| split.footer_offsets.end).sum(); + if total_num_bytes == 0 { + // Silly corner case that should never happen. + return u64::MAX; + } + // We will remove splits.len() and add 1 merge split. + let delta_num_splits = (splits.len() - 1) as u64; + // We use integer arithmetic to avoid `f64 are not ordered` silliness. + (delta_num_splits << 48) + .checked_div(total_num_bytes) + .unwrap_or(1u64) +} + +impl PartialEq for MergeOperation { + fn eq(&self, other: &Self) -> bool { + self.merge_split_id == other.merge_split_id + } +} + +impl Eq for MergeOperation {} + +impl PartialOrd for MergeOperation { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for MergeOperation { + /// The way we reason about ordering merge operations is that a highest score should take + /// precedence over a lower score; by that logic, score already serves the Ord property; we + /// formalize that here. + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.score + .cmp(&other.score) + .then_with(|| self.merge_split_id.cmp(&other.merge_split_id)) + } +} + impl fmt::Debug for MergeOperation { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( @@ -241,6 +306,30 @@ pub mod tests { }; use crate::models::{NewSplits, create_split_metadata}; + #[test] + fn test_score() { + fn op(num_splits: usize, num_bytes_per_split: u64) -> MergeOperation { + let splits: Vec = (0..num_splits) + .map(|_| SplitMetadata { + footer_offsets: 0..num_bytes_per_split, + ..Default::default() + }) + .collect(); + MergeOperation::new_merge_operation(splits) + } + // Lighter merge (smaller total bytes) at the same split count scores higher. + assert!(op(10, 10_000_000).score < op(10, 999_999).score); + // More splits removed at the same total bytes scores higher. + assert!(op(10, 10_000_000).score > op(9, 10_000_000).score); + // Score is `(delta_splits << 48) / total_bytes` — equal ratios yield equal scores. + assert_eq!( + // 9 splits, 90M bytes → delta=8. + op(9, 10_000_000).score, + // 5 splits, 45M bytes → delta=4 (same 8/90M ratio). + op(5, 10_000_000 * 9 / 10).score, + ); + } + fn pow_of_10(n: usize) -> usize { 10usize.pow(n as u32) }