Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/quickwit-compaction/src/compaction_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use quickwit_indexing::actors::{
MergeExecutor, MergeSplitDownloader, Packager, Publisher, Uploader, UploaderType,
};
use quickwit_indexing::merge_policy::MergeOperation;
use quickwit_indexing::metrics::INDEXER_METRICS;
use quickwit_indexing::{IndexingSplitStore, PublisherType, SplitsUpdateMailbox};
use quickwit_proto::indexing::MergePipelineId;
use quickwit_proto::metastore::MetastoreServiceClient;
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-compaction/src/compactor_supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ impl CompactorSupervisor {
error!(%task_id, %error, "failed to spawn compaction task");
}
}
let available_slots = self.pipelines.iter().filter(|pipeline_opt| pipeline_opt.is_none()).count();
COMPACTOR_METRICS.available_slots.set(available_slots as i64);
}

async fn spawn_task(
Expand Down Expand Up @@ -221,7 +223,6 @@ impl CompactorSupervisor {
.filter(|s| matches!(s.status, PipelineStatus::InProgress))
.count();
let available_slots = (self.pipelines.len() - in_progress_count) as i64;
COMPACTOR_METRICS.available_slots.set(available_slots);

let mut in_progress = Vec::new();
let mut successes = Vec::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::planner::metrics::COMPACTION_PLANNER_METRICS;
/// exists. Splits beyond this cap aren't lost -- they bubble into range as the front of the queue
/// is merged off.
const SCAN_PAGE_SIZE: usize = 5_000;

#[derive(Debug)]
pub struct CompactionPlanner {
state: CompactionState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl CompactionState {
break;
}
for operation in operations {
for split in operation.splits_as_slice() {
for split in &operation.splits {
self.needs_compaction_split_ids.remove(split.split_id());
self.in_flight_split_ids
.insert(split.split_id().to_string());
Expand Down
48 changes: 23 additions & 25 deletions quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,32 +126,30 @@ impl MergeSplitDownloader {
download_directory: &Path,
ctx: &ActorContext<Self>,
) -> Result<Vec<Box<dyn Directory>>, quickwit_actors::ActorExitStatus> {
// we download all of the split files in the scratch directory.
let mut tantivy_dirs = Vec::new();
for split in splits {
if ctx.kill_switch().is_dead() {
debug!(
split_id = split.split_id(),
"Kill switch was activated. Cancelling download."
);
return Err(ActorExitStatus::Killed);
}
let io_controls = self
.io_controls
.clone()
.set_progress(ctx.progress().clone())
.set_kill_switch(ctx.kill_switch().clone());
let _protect_guard = ctx.protect_zone();
let tantivy_dir = self
.split_store
.fetch_and_open_split(split.split_id(), download_directory, &io_controls)
.await
.map_err(|error| {
let split_id = split.split_id();
anyhow::anyhow!(error).context(format!("failed to download split `{split_id}`"))
})?;
tantivy_dirs.push(tantivy_dir);
if ctx.kill_switch().is_dead() {
debug!("kill switch was activated, cancelling download");
return Err(ActorExitStatus::Killed);
}
let io_controls = self
.io_controls
.clone()
.set_progress(ctx.progress().clone())
.set_kill_switch(ctx.kill_switch().clone());
let _protect_guard = ctx.protect_zone();
let download_futures = splits.iter().map(|split| {
let io_controls = io_controls.clone();
async move {
self.split_store
.fetch_and_open_split(split.split_id(), download_directory, &io_controls)
.await
.map_err(|error| {
let split_id = split.split_id();
anyhow::anyhow!(error)
.context(format!("failed to download split `{split_id}`"))
})
}
});
let tantivy_dirs = futures::future::try_join_all(download_futures).await?;
Ok(tantivy_dirs)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- no-transaction
-- CONCURRENTLY matches the up migration: avoids blocking writes on `splits`
-- while the index is being dropped. Cannot run inside a transaction.
DROP INDEX CONCURRENTLY IF EXISTS splits_published_maturity_timestamp_idx;
DROP INDEX CONCURRENTLY IF EXISTS splits_published_maturity_timestamp_idx;
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
-- transaction block, hence the `-- no-transaction` directive above.
CREATE INDEX CONCURRENTLY IF NOT EXISTS splits_published_maturity_timestamp_idx
ON splits (maturity_timestamp, split_id)
WHERE split_state = 'Published';
WHERE split_state = 'Published';
19 changes: 16 additions & 3 deletions quickwit/quickwit-storage/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use std::sync::RwLock;

use once_cell::sync::Lazy;
use quickwit_common::metrics::{
GaugeGuard, Histogram, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec,
new_gauge, new_histogram_vec,
GaugeGuard, Histogram, IntCounter, IntCounterVec, IntGauge, exponential_buckets, new_counter,
new_counter_vec, new_gauge, new_histogram_vec,
};
use quickwit_config::CacheConfig;

Expand Down Expand Up @@ -48,6 +48,9 @@ pub struct StorageMetrics {
pub object_storage_bulk_delete_requests_total: IntCounter,
pub object_storage_delete_request_duration: Histogram,
pub object_storage_bulk_delete_request_duration: Histogram,
pub object_storage_get_object_duration: Histogram,
pub object_storage_put_object_duration: Histogram,
pub object_storage_upload_part_duration: Histogram,
}

impl Default for StorageMetrics {
Expand Down Expand Up @@ -86,12 +89,19 @@ impl Default for StorageMetrics {
"storage",
&[],
["action"],
vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0],
exponential_buckets(0.001, 2.0, 18)
.expect("object storage duration buckets should be valid"),
);
let object_storage_delete_request_duration =
object_storage_request_duration.with_label_values(["delete_object"]);
let object_storage_bulk_delete_request_duration =
object_storage_request_duration.with_label_values(["delete_objects"]);
let object_storage_get_object_duration =
object_storage_request_duration.with_label_values(["get_object"]);
let object_storage_put_object_duration =
object_storage_request_duration.with_label_values(["put_object"]);
let object_storage_upload_part_duration =
object_storage_request_duration.with_label_values(["upload_part"]);

StorageMetrics {
fast_field_cache: CacheMetrics::for_component("fastfields"),
Expand Down Expand Up @@ -159,6 +169,9 @@ impl Default for StorageMetrics {
object_storage_bulk_delete_requests_total,
object_storage_delete_request_duration,
object_storage_bulk_delete_request_duration,
object_storage_get_object_duration,
object_storage_put_object_duration,
object_storage_upload_part_duration,
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions quickwit/quickwit-storage/src/object_storage/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ impl MultiPartPolicy {
impl Default for MultiPartPolicy {
fn default() -> Self {
MultiPartPolicy {
// S3 limits part size from 5M to 5GB, we want to end up with as few parts as possible
// since each part is charged as a put request.
target_part_num_bytes: 5_000_000_000, // 5GB
/// We want to balance time spent waiting on uploads while still being careful about
/// not hammering S3 Puts, which can be expensive.
/// TODO: Dynamic multipart policy.
target_part_num_bytes: 512 * 1_024 * 1_024, // 5GB
Comment thread
nadav-govari marked this conversation as resolved.
Comment thread
nadav-govari marked this conversation as resolved.
multipart_threshold_num_bytes: 128 * 1_024 * 1_024, // 128 MiB
max_num_parts: 10_000,
max_object_num_bytes: 5_000_000_000_000u64, // S3 allows up to 5TB objects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ impl S3CompatibleObjectStorage {
Ok(delete_requests)
}

#[tracing::instrument(skip_all, fields(part_number = part.part_number, num_bytes=part.len()))]
async fn upload_part<'a>(
&'a self,
upload_id: MultipartUploadId,
Expand Down
Loading