Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions quickwit/quickwit-compaction/src/planner/compaction_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use quickwit_proto::compaction::{
CompactionResult, MergeTaskAssignment, ReportStatusRequest, ReportStatusResponse,
};
use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::types::{IndexUid, NodeId, SourceId};
use quickwit_proto::types::NodeId;
use time::OffsetDateTime;
use tracing::{error, info};
use ulid::Ulid;
Expand Down Expand Up @@ -190,20 +190,13 @@ impl CompactionPlanner {
let pending = self.state.pop_pending(available_slots as usize);
let mut assignments = Vec::with_capacity(pending.len());

for (partition_key, operation) in pending {
for operation in pending {
let task_id = Ulid::new().to_string();
let Some(index_entry) = self.index_config_metastore.get(&partition_key.index_uid)
else {
error!(index_uid=%partition_key.index_uid, "index config not found for pending operation, skipping");
let Some(index_entry) = self.index_config_metastore.get(&operation.index_uid) else {
error!(index_uid=%operation.index_uid, "index config not found for pending operation, skipping");
continue;
};
let assignment = build_task_assignment(
&task_id,
index_entry,
&operation,
&partition_key.index_uid,
&partition_key.source_id,
);
let assignment = build_task_assignment(&task_id, index_entry, &operation);

let split_ids = operation
.splits_as_slice()
Expand Down Expand Up @@ -237,8 +230,6 @@ fn build_task_assignment(
task_id: &str,
index_entry: &IndexEntry,
operation: &MergeOperation,
index_uid: &IndexUid,
source_id: &SourceId,
) -> MergeTaskAssignment {
MergeTaskAssignment {
task_id: task_id.to_string(),
Expand All @@ -253,8 +244,8 @@ fn build_task_assignment(
search_settings_json: index_entry.search_settings_json(),
indexing_settings_json: index_entry.indexing_settings_json(),
retention_policy_json: index_entry.retention_policy_json(),
index_uid: Some(index_uid.clone()),
source_id: source_id.to_string(),
index_uid: Some(operation.index_uid.clone()),
source_id: operation.source_id.clone(),
index_storage_uri: index_entry.index_storage_uri(),
merge_level: operation.merge_level() as u64,
}
Expand Down
18 changes: 6 additions & 12 deletions quickwit/quickwit-compaction/src/planner/compaction_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ impl CompactionPartitionKey {

#[derive(Debug)]
struct InFlightCompaction {
task_id: TaskId,
split_ids: Vec<SplitId>,
node_id: NodeId,
last_heartbeat: Instant,
Expand All @@ -66,8 +65,6 @@ pub struct CompactionState {
needs_compaction_split_ids: HashSet<SplitId>,
in_flight: HashMap<TaskId, InFlightCompaction>,
in_flight_split_ids: HashSet<SplitId>,
/// TODO: add index_uid and source_id to MergeOperation so we don't need the partition key
/// here.
pending_operations: PendingOperations,
}

Expand Down Expand Up @@ -123,8 +120,7 @@ impl CompactionState {
self.in_flight_split_ids
.insert(split.split_id().to_string());
}
self.pending_operations
.push(partition_key.clone(), operation);
self.pending_operations.push(operation);
}
}
if splits.is_empty() {
Expand Down Expand Up @@ -170,7 +166,6 @@ impl CompactionState {
self.in_flight.insert(
task.task_id.clone(),
InFlightCompaction {
task_id: task.task_id.clone(),
split_ids: task.split_ids.clone(),
node_id: node_id.clone(),
last_heartbeat: Instant::now(),
Expand Down Expand Up @@ -201,8 +196,8 @@ impl CompactionState {
}
}

/// Pops up to `count` pending operations for assignment.
pub fn pop_pending(&mut self, count: usize) -> Vec<(CompactionPartitionKey, MergeOperation)> {
/// Pops up to `count` pending operations for assignment, highest-score first.
pub fn pop_pending(&mut self, count: usize) -> Vec<MergeOperation> {
let count = count.min(self.pending_operations.len());
let mut operations = Vec::with_capacity(count);
for _ in 0..count {
Expand All @@ -214,9 +209,8 @@ impl CompactionState {
/// Records that an operation has been assigned to a worker.
pub fn record_assignment(&mut self, task_id: TaskId, split_ids: Vec<SplitId>, node_id: NodeId) {
self.in_flight.insert(
task_id.clone(),
task_id,
InFlightCompaction {
task_id,
split_ids,
node_id,
last_heartbeat: Instant::now(),
Expand Down Expand Up @@ -330,7 +324,7 @@ mod tests {

// Splits moved from needs_compaction to in_flight.
assert!(!state.pending_operations.is_empty());
for (_, op) in state.pending_operations.iter() {
for op in state.pending_operations.iter() {
for split in op.splits_as_slice() {
assert!(!state.needs_compaction_split_ids.contains(split.split_id()));
assert!(state.in_flight_split_ids.contains(split.split_id()));
Expand Down Expand Up @@ -413,7 +407,7 @@ mod tests {
let pending = state.pop_pending(1);
assert_eq!(pending.len(), 1);

let (_, operation) = &pending[0];
let operation = &pending[0];
let split_ids: Vec<String> = operation
.splits_as_slice()
.iter()
Expand Down
41 changes: 17 additions & 24 deletions quickwit/quickwit-compaction/src/planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,43 @@
// limitations under the License.

mod compaction_planner;
#[allow(dead_code)]
mod compaction_state;
#[allow(dead_code)]
mod index_config_metastore;
pub(crate) mod metrics;

use std::collections::VecDeque;
use std::collections::BinaryHeap;

pub use compaction_planner::CompactionPlanner;
use quickwit_indexing::merge_policy::MergeOperation;

use crate::planner::compaction_state::CompactionPartitionKey;
use crate::planner::metrics::COMPACTION_PLANNER_METRICS;
use crate::source_uid_metrics_label;

/// Queue of merge operations awaiting assignment, with the
/// `pending_merge_operations` gauge maintained inline. Push/pop are the only
/// mutation paths so the metric stays consistent with `len()`.
/// Max-heap of merge operations awaiting assignment, ordered by
/// `MergeOperation`'s score-based `Ord`. The `pending_merge_operations` gauge
/// is maintained inline; push/pop are the only mutation paths so the metric
/// stays consistent with `len()`.
#[derive(Debug)]
struct PendingOperations {
inner: VecDeque<(CompactionPartitionKey, MergeOperation)>,
inner: BinaryHeap<MergeOperation>,
}

impl PendingOperations {
fn new() -> Self {
Self {
inner: VecDeque::new(),
inner: BinaryHeap::new(),
}
}

fn push(&mut self, partition_key: CompactionPartitionKey, operation: MergeOperation) {
Self::adjust_gauge(&partition_key, &operation, 1);
self.inner.push_back((partition_key, operation));
fn push(&mut self, operation: MergeOperation) {
Self::adjust_gauge(&operation, 1);
self.inner.push(operation);
}

fn pop(&mut self) -> Option<(CompactionPartitionKey, MergeOperation)> {
let (partition_key, operation) = self.inner.pop_front()?;
Self::adjust_gauge(&partition_key, &operation, -1);
Some((partition_key, operation))
fn pop(&mut self) -> Option<MergeOperation> {
let operation = self.inner.pop()?;
Self::adjust_gauge(&operation, -1);
Some(operation)
}

fn len(&self) -> usize {
Expand All @@ -64,17 +62,12 @@ impl PendingOperations {
}

#[cfg(test)]
fn iter(&self) -> impl Iterator<Item = &(CompactionPartitionKey, MergeOperation)> {
fn iter(&self) -> impl Iterator<Item = &MergeOperation> {
self.inner.iter()
}

fn adjust_gauge(
partition_key: &CompactionPartitionKey,
operation: &MergeOperation,
delta: i64,
) {
let source_uid_label =
source_uid_metrics_label(&partition_key.index_uid, &partition_key.source_id);
fn adjust_gauge(operation: &MergeOperation, delta: i64) {
let source_uid_label = source_uid_metrics_label(&operation.index_uid, &operation.source_id);
let merge_level = operation.merge_level().to_string();
COMPACTION_PLANNER_METRICS
.pending_merge_operations
Expand Down
38 changes: 1 addition & 37 deletions quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,30 +209,12 @@ struct ScheduleMerge {
split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
}

/// The higher, the sooner we will execute the merge operation.
/// A good merge operation
/// - strongly reduces the number splits
/// - is light.
fn score_merge_operation(merge_operation: &MergeOperation) -> u64 {
let total_num_bytes: u64 = merge_operation.total_num_bytes();
if total_num_bytes == 0 {
// Silly corner case that should never happen.
return u64::MAX;
}
// We will remove splits.len() and add 1 merge splits.
let delta_num_splits = (merge_operation.splits.len() - 1) as u64;
// We use integer arithmetic to avoid `f64 are not ordered` silliness.
(delta_num_splits << 48)
.checked_div(total_num_bytes)
.unwrap_or(1u64)
}

impl ScheduleMerge {
pub fn new(
merge_operation: TrackedObject<MergeOperation>,
split_downloader_mailbox: Mailbox<MergeSplitDownloader>,
) -> ScheduleMerge {
let score = score_merge_operation(&merge_operation);
let score = merge_operation.score;
ScheduleMerge {
score,
merge_operation,
Expand Down Expand Up @@ -315,24 +297,6 @@ mod tests {
MergeOperation::new_merge_operation(splits)
}

#[test]
fn test_score_merge_operation() {
let score_merge_operation_aux = |num_splits, num_bytes_per_split| {
let merge_operation = build_merge_operation(num_splits, num_bytes_per_split);
score_merge_operation(&merge_operation)
};
assert!(score_merge_operation_aux(10, 10_000_000) < score_merge_operation_aux(10, 999_999));
assert!(
score_merge_operation_aux(10, 10_000_000) > score_merge_operation_aux(9, 10_000_000)
);
assert_eq!(
// 9 - 1 = 8 splits removed.
score_merge_operation_aux(9, 10_000_000),
// 5 - 1 = 4 splits removed.
score_merge_operation_aux(5, 10_000_000 * 9 / 10)
);
}

#[tokio::test]
async fn test_merge_schedule_service_prioritize() {
let universe = Universe::new();
Expand Down
Loading
Loading