From 04f524c98971636e37d9dddc1c0ffcb5c4dc5dd5 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Fri, 15 May 2026 15:37:53 -0400 Subject: [PATCH] Performance improvements: multipart upload, parallel gets, spans --- .../src/compaction_pipeline.rs | 1 + .../src/compactor_supervisor.rs | 3 +- .../src/planner/compaction_planner.rs | 1 + .../src/planner/compaction_state.rs | 2 +- .../src/actors/merge_split_downloader.rs | 48 +++++++++---------- ...its-published-maturity-timestamp.down.sql} | 2 +- ...plits-published-maturity-timestamp.up.sql} | 2 +- quickwit/quickwit-storage/src/metrics.rs | 19 ++++++-- .../src/object_storage/policy.rs | 7 +-- .../object_storage/s3_compatible_storage.rs | 1 + 10 files changed, 51 insertions(+), 35 deletions(-) rename quickwit/quickwit-metastore/migrations/postgresql/{27_create-index-splits-published-maturity-timestamp.down.sql => 29_create-index-splits-published-maturity-timestamp.down.sql} (95%) rename quickwit/quickwit-metastore/migrations/postgresql/{27_create-index-splits-published-maturity-timestamp.up.sql => 29_create-index-splits-published-maturity-timestamp.up.sql} (97%) diff --git a/quickwit/quickwit-compaction/src/compaction_pipeline.rs b/quickwit/quickwit-compaction/src/compaction_pipeline.rs index a35017384a0..f3095564795 100644 --- a/quickwit/quickwit-compaction/src/compaction_pipeline.rs +++ b/quickwit/quickwit-compaction/src/compaction_pipeline.rs @@ -26,6 +26,7 @@ use quickwit_indexing::actors::{ MergeExecutor, MergeSplitDownloader, Packager, Publisher, Uploader, UploaderType, }; use quickwit_indexing::merge_policy::MergeOperation; +use quickwit_indexing::metrics::INDEXER_METRICS; use quickwit_indexing::{IndexingSplitStore, PublisherType, SplitsUpdateMailbox}; use quickwit_proto::indexing::MergePipelineId; use quickwit_proto::metastore::MetastoreServiceClient; diff --git a/quickwit/quickwit-compaction/src/compactor_supervisor.rs b/quickwit/quickwit-compaction/src/compactor_supervisor.rs index 52da22c4951..bb570505db7 100644 --- a/quickwit/quickwit-compaction/src/compactor_supervisor.rs +++ b/quickwit/quickwit-compaction/src/compactor_supervisor.rs @@ -124,6 +124,8 @@ impl CompactorSupervisor { error!(%task_id, %error, "failed to spawn compaction task"); } } + let available_slots = self.pipelines.iter().filter(|pipeline_opt| pipeline_opt.is_none()).count(); + COMPACTOR_METRICS.available_slots.set(available_slots as i64); } async fn spawn_task( @@ -221,7 +223,6 @@ impl CompactorSupervisor { .filter(|s| matches!(s.status, PipelineStatus::InProgress)) .count(); let available_slots = (self.pipelines.len() - in_progress_count) as i64; - COMPACTOR_METRICS.available_slots.set(available_slots); let mut in_progress = Vec::new(); let mut successes = Vec::new(); diff --git a/quickwit/quickwit-compaction/src/planner/compaction_planner.rs b/quickwit/quickwit-compaction/src/planner/compaction_planner.rs index ff46948e429..7b30c2174df 100644 --- a/quickwit/quickwit-compaction/src/planner/compaction_planner.rs +++ b/quickwit/quickwit-compaction/src/planner/compaction_planner.rs @@ -41,6 +41,7 @@ use crate::planner::metrics::COMPACTION_PLANNER_METRICS; /// exists. Splits beyond this cap aren't lost -- they bubble into range as the front of the queue /// is merged off. const SCAN_PAGE_SIZE: usize = 5_000; + #[derive(Debug)] pub struct CompactionPlanner { state: CompactionState, diff --git a/quickwit/quickwit-compaction/src/planner/compaction_state.rs b/quickwit/quickwit-compaction/src/planner/compaction_state.rs index 6f9dc2069c7..4e0616cdfc1 100644 --- a/quickwit/quickwit-compaction/src/planner/compaction_state.rs +++ b/quickwit/quickwit-compaction/src/planner/compaction_state.rs @@ -115,7 +115,7 @@ impl CompactionState { break; } for operation in operations { - for split in operation.splits_as_slice() { + for split in &operation.splits { self.needs_compaction_split_ids.remove(split.split_id()); self.in_flight_split_ids .insert(split.split_id().to_string()); diff --git a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs index 6b1bcc0ddb9..1911ecc0363 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs @@ -126,32 +126,30 @@ impl MergeSplitDownloader { download_directory: &Path, ctx: &ActorContext, ) -> Result>, quickwit_actors::ActorExitStatus> { - // we download all of the split files in the scratch directory. - let mut tantivy_dirs = Vec::new(); - for split in splits { - if ctx.kill_switch().is_dead() { - debug!( - split_id = split.split_id(), - "Kill switch was activated. Cancelling download." - ); - return Err(ActorExitStatus::Killed); - } - let io_controls = self - .io_controls - .clone() - .set_progress(ctx.progress().clone()) - .set_kill_switch(ctx.kill_switch().clone()); - let _protect_guard = ctx.protect_zone(); - let tantivy_dir = self - .split_store - .fetch_and_open_split(split.split_id(), download_directory, &io_controls) - .await - .map_err(|error| { - let split_id = split.split_id(); - anyhow::anyhow!(error).context(format!("failed to download split `{split_id}`")) - })?; - tantivy_dirs.push(tantivy_dir); + if ctx.kill_switch().is_dead() { + debug!("kill switch was activated, cancelling download"); + return Err(ActorExitStatus::Killed); } + let io_controls = self + .io_controls + .clone() + .set_progress(ctx.progress().clone()) + .set_kill_switch(ctx.kill_switch().clone()); + let _protect_guard = ctx.protect_zone(); + let download_futures = splits.iter().map(|split| { + let io_controls = io_controls.clone(); + async move { + self.split_store + .fetch_and_open_split(split.split_id(), download_directory, &io_controls) + .await + .map_err(|error| { + let split_id = split.split_id(); + anyhow::anyhow!(error) + .context(format!("failed to download split `{split_id}`")) + }) + } + }); + let tantivy_dirs = futures::future::try_join_all(download_futures).await?; Ok(tantivy_dirs) } } diff --git a/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/29_create-index-splits-published-maturity-timestamp.down.sql similarity index 95% rename from quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.down.sql rename to quickwit/quickwit-metastore/migrations/postgresql/29_create-index-splits-published-maturity-timestamp.down.sql index e4568958939..c8bd22af8c3 100644 --- a/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.down.sql +++ b/quickwit/quickwit-metastore/migrations/postgresql/29_create-index-splits-published-maturity-timestamp.down.sql @@ -1,4 +1,4 @@ -- no-transaction -- CONCURRENTLY matches the up migration: avoids blocking writes on `splits` -- while the index is being dropped. Cannot run inside a transaction. -DROP INDEX CONCURRENTLY IF EXISTS splits_published_maturity_timestamp_idx; \ No newline at end of file +DROP INDEX CONCURRENTLY IF EXISTS splits_published_maturity_timestamp_idx; diff --git a/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.up.sql b/quickwit/quickwit-metastore/migrations/postgresql/29_create-index-splits-published-maturity-timestamp.up.sql similarity index 97% rename from quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.up.sql rename to quickwit/quickwit-metastore/migrations/postgresql/29_create-index-splits-published-maturity-timestamp.up.sql index 0d9fa0f5cff..f6a4c7a53d0 100644 --- a/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.up.sql +++ b/quickwit/quickwit-metastore/migrations/postgresql/29_create-index-splits-published-maturity-timestamp.up.sql @@ -21,4 +21,4 @@ -- transaction block, hence the `-- no-transaction` directive above. CREATE INDEX CONCURRENTLY IF NOT EXISTS splits_published_maturity_timestamp_idx ON splits (maturity_timestamp, split_id) - WHERE split_state = 'Published'; \ No newline at end of file + WHERE split_state = 'Published'; diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 8b439bfaeaa..8ce8299569c 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -19,8 +19,8 @@ use std::sync::RwLock; use once_cell::sync::Lazy; use quickwit_common::metrics::{ - GaugeGuard, Histogram, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec, - new_gauge, new_histogram_vec, + GaugeGuard, Histogram, IntCounter, IntCounterVec, IntGauge, exponential_buckets, new_counter, + new_counter_vec, new_gauge, new_histogram_vec, }; use quickwit_config::CacheConfig; @@ -48,6 +48,9 @@ pub struct StorageMetrics { pub object_storage_bulk_delete_requests_total: IntCounter, pub object_storage_delete_request_duration: Histogram, pub object_storage_bulk_delete_request_duration: Histogram, + pub object_storage_get_object_duration: Histogram, + pub object_storage_put_object_duration: Histogram, + pub object_storage_upload_part_duration: Histogram, } impl Default for StorageMetrics { @@ -86,12 +89,19 @@ impl Default for StorageMetrics { "storage", &[], ["action"], - vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0], + exponential_buckets(0.001, 2.0, 18) + .expect("object storage duration buckets should be valid"), ); let object_storage_delete_request_duration = object_storage_request_duration.with_label_values(["delete_object"]); let object_storage_bulk_delete_request_duration = object_storage_request_duration.with_label_values(["delete_objects"]); + let object_storage_get_object_duration = + object_storage_request_duration.with_label_values(["get_object"]); + let object_storage_put_object_duration = + object_storage_request_duration.with_label_values(["put_object"]); + let object_storage_upload_part_duration = + object_storage_request_duration.with_label_values(["upload_part"]); StorageMetrics { fast_field_cache: CacheMetrics::for_component("fastfields"), @@ -159,6 +169,9 @@ impl Default for StorageMetrics { object_storage_bulk_delete_requests_total, object_storage_delete_request_duration, object_storage_bulk_delete_request_duration, + object_storage_get_object_duration, + object_storage_put_object_duration, + object_storage_upload_part_duration, } } } diff --git a/quickwit/quickwit-storage/src/object_storage/policy.rs b/quickwit/quickwit-storage/src/object_storage/policy.rs index 6ce48ab7a94..797cd612029 100644 --- a/quickwit/quickwit-storage/src/object_storage/policy.rs +++ b/quickwit/quickwit-storage/src/object_storage/policy.rs @@ -67,9 +67,10 @@ impl MultiPartPolicy { impl Default for MultiPartPolicy { fn default() -> Self { MultiPartPolicy { - // S3 limits part size from 5M to 5GB, we want to end up with as few parts as possible - // since each part is charged as a put request. - target_part_num_bytes: 5_000_000_000, // 5GB + /// We want to balance time spent waiting on uploads while still being careful about + /// not hammering S3 Puts, which can be expensive. + /// TODO: Dynamic multipart policy. + target_part_num_bytes: 512 * 1_024 * 1_024, // 5GB multipart_threshold_num_bytes: 128 * 1_024 * 1_024, // 128 MiB max_num_parts: 10_000, max_object_num_bytes: 5_000_000_000_000u64, // S3 allows up to 5TB objects diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 5214b2bc37f..0b58858b786 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -409,6 +409,7 @@ impl S3CompatibleObjectStorage { Ok(delete_requests) } + #[tracing::instrument(skip_all, fields(part_number = part.part_number, num_bytes=part.len()))] async fn upload_part<'a>( &'a self, upload_id: MultipartUploadId,