From 207cf769af14316414f012bf2fc8585abd6ea140 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Mon, 18 May 2026 16:20:49 -0400 Subject: [PATCH 1/7] Use CRC32C for checksums instead of MD5 --- .../object_storage/s3_compatible_storage.rs | 83 +++++-------------- 1 file changed, 22 insertions(+), 61 deletions(-) diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index a407ccdaeca..59d1f640733 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -33,8 +33,9 @@ use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput; use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput}; use aws_sdk_s3::primitives::{AggregatedBytes, ByteStream}; use aws_sdk_s3::types::builders::ObjectIdentifierBuilder; -use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier}; -use base64::prelude::{BASE64_STANDARD, Engine}; +use aws_sdk_s3::types::{ + ChecksumAlgorithm, CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier, +}; use bytes::Bytes; use futures::{StreamExt, stream}; use quickwit_aws::retry::{AwsRetryable, aws_retry}; @@ -45,7 +46,7 @@ use quickwit_common::{chunk_range, into_u64_range}; use quickwit_config::S3StorageConfig; use quickwit_metrics::HistogramTimer; use regex::Regex; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader, ReadBuf}; +use tokio::io::{AsyncRead, AsyncWriteExt, BufReader, ReadBuf}; use tokio::sync::Semaphore; use tracing::{info, instrument, warn}; @@ -251,7 +252,6 @@ struct MultipartUploadId(pub String); struct Part { pub part_number: usize, pub range: Range, - pub md5: md5::Digest, } impl Part { @@ -260,20 +260,6 @@ impl Part { } } -const MD5_CHUNK_SIZE: usize = 1_000_000; - -async fn compute_md5(mut read: T) -> io::Result { - let mut checksum = md5::Context::new(); - let mut buf = vec![0; MD5_CHUNK_SIZE]; - loop { - let read_len = read.read(&mut buf).await?; - checksum.consume(&buf[..read_len]); - if read_len == 0 { - return Ok(checksum.finalize()); - } - } -} - impl S3CompatibleObjectStorage { fn key(&self, relative_path: &Path) -> String { // FIXME: This may not work on Windows. @@ -310,6 +296,7 @@ impl S3CompatibleObjectStorage { .key(key) .body(body) .content_length(len as i64) + .checksum_algorithm(ChecksumAlgorithm::Crc32C) .send() .await .map_err(|sdk_error| { @@ -322,6 +309,7 @@ impl S3CompatibleObjectStorage { Ok(()) } + #[tracing::instrument(skip_all)] async fn put_single_part<'a>( &'a self, key: &'a str, @@ -343,6 +331,7 @@ impl S3CompatibleObjectStorage { self.s3_client .create_multipart_upload() .bucket(self.bucket.clone()) + .checksum_algorithm(ChecksumAlgorithm::Crc32C) .key(key) .send() .await @@ -356,34 +345,16 @@ impl S3CompatibleObjectStorage { Ok(MultipartUploadId(upload_id)) } - async fn create_multipart_requests( - &self, - payload: Box, - len: u64, - part_len: u64, - ) -> io::Result> { + fn create_multipart_requests(&self, len: u64, part_len: u64) -> Vec { assert!(len > 0); - let multipart_ranges = chunk_range(0..len as usize, part_len as usize) + chunk_range(0..len as usize, part_len as usize) .map(into_u64_range) - .collect::>(); - - let mut parts = Vec::with_capacity(multipart_ranges.len()); - - for (multipart_id, multipart_range) in multipart_ranges.into_iter().enumerate() { - let read = payload - .range_byte_stream(multipart_range.clone()) - .await? - .into_async_read(); - let md5 = compute_md5(read).await?; - - let part = Part { + .enumerate() + .map(|(multipart_id, multipart_range)| Part { part_number: multipart_id + 1, // parts are 1-indexed range: multipart_range, - md5, - }; - parts.push(part); - } - Ok(parts) + }) + .collect() } fn build_delete_batch_requests<'a>( @@ -420,6 +391,7 @@ impl S3CompatibleObjectStorage { Ok(delete_requests) } + #[tracing::instrument(skip_all, fields(part_number = part.part_number, num_bytes=part.len()))] async fn upload_part<'a>( &'a self, upload_id: MultipartUploadId, @@ -432,7 +404,6 @@ impl S3CompatibleObjectStorage { .await .map_err(StorageError::from) .map_err(Retry::Permanent)?; - let md5 = BASE64_STANDARD.encode(part.md5.0); crate::metrics::OBJECT_STORAGE_PUT_PARTS.inc(); crate::metrics::OBJECT_STORAGE_UPLOAD_NUM_BYTES.inc_by(part.len()); @@ -441,24 +412,25 @@ impl S3CompatibleObjectStorage { .s3_client .upload_part() .bucket(self.bucket.clone()) + .checksum_algorithm(ChecksumAlgorithm::Crc32C) .key(key) .body(byte_stream) .content_length(part.len() as i64) - .content_md5(md5) .part_number(part.part_number as i32) .upload_id(upload_id.0) .send() .await - .map_err(|sdk_error| { - if sdk_error.is_retryable() { - Retry::Transient(StorageError::from(sdk_error)) + .map_err(|s3_err| { + if s3_err.is_retryable() { + Retry::Transient(StorageError::from(s3_err)) } else { - Retry::Permanent(StorageError::from(sdk_error)) + Retry::Permanent(StorageError::from(s3_err)) } })?; let completed_part = CompletedPart::builder() .set_e_tag(upload_part_output.e_tag) + .set_checksum_crc32_c(upload_part_output.checksum_crc32_c) .part_number(part.part_number as i32) .build(); Ok(completed_part) @@ -472,9 +444,7 @@ impl S3CompatibleObjectStorage { total_len: u64, ) -> StorageResult<()> { let upload_id = self.create_multipart_upload(key).await?; - let parts = self - .create_multipart_requests(payload.clone(), total_len, part_len) - .await?; + let parts = self.create_multipart_requests(total_len, part_len); let max_concurrent_upload = self.multipart_policy.max_concurrent_uploads(); let completed_parts_res: StorageResult> = stream::iter(parts.into_iter().map(|part| { @@ -488,7 +458,7 @@ impl S3CompatibleObjectStorage { .collect::>() .await .into_iter() - .map(|res| res.map_err(|error| error.into_inner())) + .map(|res| res.map_err(|e| e.into_inner())) .collect(); match completed_parts_res { Ok(completed_parts) => { @@ -923,15 +893,6 @@ mod tests { use super::*; use crate::{MultiPartPolicy, S3CompatibleObjectStorage}; - #[tokio::test] - async fn test_md5_calc() -> std::io::Result<()> { - let data = (0..1_500_000).map(|el| el as u8).collect::>(); - let md5 = compute_md5(data.as_slice()).await?; - assert_eq!(md5, md5::compute(data)); - - Ok(()) - } - #[test] fn test_split_range_into_chunks_inexact() { assert_eq!( From 600281b691f57b50721d049d60b25e4bec23e74f Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Tue, 19 May 2026 10:43:28 -0400 Subject: [PATCH 2/7] Only apply crc32c on non-S3 flavors --- .../object_storage/s3_compatible_storage.rs | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 59d1f640733..9c7f0e6b163 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -43,8 +43,8 @@ use quickwit_aws::{aws_behavior_version, get_aws_config}; use quickwit_common::retry::{Retry, RetryParams}; use quickwit_common::uri::Uri; use quickwit_common::{chunk_range, into_u64_range}; -use quickwit_config::S3StorageConfig; use quickwit_metrics::HistogramTimer; +use quickwit_config::{S3StorageConfig, StorageBackendFlavor}; use regex::Regex; use tokio::io::{AsyncRead, AsyncWriteExt, BufReader, ReadBuf}; use tokio::sync::Semaphore; @@ -95,6 +95,9 @@ pub struct S3CompatibleObjectStorage { retry_params: RetryParams, disable_multi_object_delete: bool, disable_multipart_upload: bool, + // `None` means native AWS S3. Non-S3 flavors (GCS, MinIO, Garage, DigitalOcean) do not + // reliably honor CRC32C, so we only request the SDK to compute & send it for native S3. + flavor: Option, } impl fmt::Debug for S3CompatibleObjectStorage { @@ -199,6 +202,7 @@ impl S3CompatibleObjectStorage { retry_params, disable_multi_object_delete, disable_multipart_upload, + flavor: s3_storage_config.flavor, }) } @@ -216,6 +220,7 @@ impl S3CompatibleObjectStorage { retry_params: self.retry_params, disable_multi_object_delete: self.disable_multi_object_delete, disable_multipart_upload: self.disable_multipart_upload, + flavor: self.flavor, } } @@ -261,6 +266,15 @@ impl Part { } impl S3CompatibleObjectStorage { + /// Returns the checksum algorithm to advertise on uploads, or `None` to omit it. + /// Only native AWS S3 reliably supports CRC32C. + fn upload_checksum_algorithm(&self) -> Option { + match self.flavor { + None => Some(ChecksumAlgorithm::Crc32C), + Some(_) => None, + } + } + fn key(&self, relative_path: &Path) -> String { // FIXME: This may not work on Windows. let key_path = self.prefix.join(relative_path); @@ -296,7 +310,7 @@ impl S3CompatibleObjectStorage { .key(key) .body(body) .content_length(len as i64) - .checksum_algorithm(ChecksumAlgorithm::Crc32C) + .set_checksum_algorithm(self.upload_checksum_algorithm()) .send() .await .map_err(|sdk_error| { @@ -331,7 +345,7 @@ impl S3CompatibleObjectStorage { self.s3_client .create_multipart_upload() .bucket(self.bucket.clone()) - .checksum_algorithm(ChecksumAlgorithm::Crc32C) + .set_checksum_algorithm(self.upload_checksum_algorithm()) .key(key) .send() .await @@ -412,7 +426,7 @@ impl S3CompatibleObjectStorage { .s3_client .upload_part() .bucket(self.bucket.clone()) - .checksum_algorithm(ChecksumAlgorithm::Crc32C) + .set_checksum_algorithm(self.upload_checksum_algorithm()) .key(key) .body(byte_stream) .content_length(part.len() as i64) @@ -955,6 +969,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + flavor: None, }; assert_eq!( s3_storage.relative_path("indexes/foo"), @@ -1002,6 +1017,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: true, disable_multipart_upload: false, + flavor: None, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1039,6 +1055,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + flavor: None, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1121,6 +1138,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + flavor: None, }; let bulk_delete_error = s3_storage .bulk_delete(&[ @@ -1212,6 +1230,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + flavor: None, }; s3_storage .put(Path::new("my-path"), Box::new(vec![1, 2, 3])) From 78d6275ffff7ff945a4dd9f6ba91d18a4157c971 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Tue, 19 May 2026 11:18:52 -0400 Subject: [PATCH 3/7] lints --- quickwit/Cargo.lock | 1 - quickwit/quickwit-storage/Cargo.toml | 1 - .../src/object_storage/s3_compatible_storage.rs | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 97b24e0725d..41f3a1daee0 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -9390,7 +9390,6 @@ dependencies = [ "azure_identity", "azure_storage", "azure_storage_blobs", - "base64 0.22.1", "bytes", "bytesize", "fnv", diff --git a/quickwit/quickwit-storage/Cargo.toml b/quickwit/quickwit-storage/Cargo.toml index 8fe4701f631..9a1f4344289 100644 --- a/quickwit/quickwit-storage/Cargo.toml +++ b/quickwit/quickwit-storage/Cargo.toml @@ -13,7 +13,6 @@ license.workspace = true [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } -base64 = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } fnv = { workspace = true } diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 9c7f0e6b163..983169730f6 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -43,8 +43,8 @@ use quickwit_aws::{aws_behavior_version, get_aws_config}; use quickwit_common::retry::{Retry, RetryParams}; use quickwit_common::uri::Uri; use quickwit_common::{chunk_range, into_u64_range}; -use quickwit_metrics::HistogramTimer; use quickwit_config::{S3StorageConfig, StorageBackendFlavor}; +use quickwit_metrics::HistogramTimer; use regex::Regex; use tokio::io::{AsyncRead, AsyncWriteExt, BufReader, ReadBuf}; use tokio::sync::Semaphore; From f7f6a45788419146f035a8afca6780cbb449bd42 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Tue, 19 May 2026 14:45:44 -0400 Subject: [PATCH 4/7] Make crc32 default and a turn-off button for it --- .../object_storage/s3_compatible_storage.rs | 35 +++++++------------ 1 file changed, 12 insertions(+), 23 deletions(-) diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 983169730f6..556b2cdd62c 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -43,7 +43,7 @@ use quickwit_aws::{aws_behavior_version, get_aws_config}; use quickwit_common::retry::{Retry, RetryParams}; use quickwit_common::uri::Uri; use quickwit_common::{chunk_range, into_u64_range}; -use quickwit_config::{S3StorageConfig, StorageBackendFlavor}; +use quickwit_config::S3StorageConfig; use quickwit_metrics::HistogramTimer; use regex::Regex; use tokio::io::{AsyncRead, AsyncWriteExt, BufReader, ReadBuf}; @@ -95,9 +95,7 @@ pub struct S3CompatibleObjectStorage { retry_params: RetryParams, disable_multi_object_delete: bool, disable_multipart_upload: bool, - // `None` means native AWS S3. Non-S3 flavors (GCS, MinIO, Garage, DigitalOcean) do not - // reliably honor CRC32C, so we only request the SDK to compute & send it for native S3. - flavor: Option, + disable_checksums: bool, } impl fmt::Debug for S3CompatibleObjectStorage { @@ -202,7 +200,7 @@ impl S3CompatibleObjectStorage { retry_params, disable_multi_object_delete, disable_multipart_upload, - flavor: s3_storage_config.flavor, + disable_checksums: s3_storage_config.disable_checksums, }) } @@ -220,7 +218,7 @@ impl S3CompatibleObjectStorage { retry_params: self.retry_params, disable_multi_object_delete: self.disable_multi_object_delete, disable_multipart_upload: self.disable_multipart_upload, - flavor: self.flavor, + disable_checksums: self.disable_checksums, } } @@ -266,15 +264,6 @@ impl Part { } impl S3CompatibleObjectStorage { - /// Returns the checksum algorithm to advertise on uploads, or `None` to omit it. - /// Only native AWS S3 reliably supports CRC32C. - fn upload_checksum_algorithm(&self) -> Option { - match self.flavor { - None => Some(ChecksumAlgorithm::Crc32C), - Some(_) => None, - } - } - fn key(&self, relative_path: &Path) -> String { // FIXME: This may not work on Windows. let key_path = self.prefix.join(relative_path); @@ -310,7 +299,7 @@ impl S3CompatibleObjectStorage { .key(key) .body(body) .content_length(len as i64) - .set_checksum_algorithm(self.upload_checksum_algorithm()) + .set_checksum_algorithm((!self.disable_checksums).then_some(ChecksumAlgorithm::Crc32C)) .send() .await .map_err(|sdk_error| { @@ -345,7 +334,7 @@ impl S3CompatibleObjectStorage { self.s3_client .create_multipart_upload() .bucket(self.bucket.clone()) - .set_checksum_algorithm(self.upload_checksum_algorithm()) + .set_checksum_algorithm((!self.disable_checksums).then_some(ChecksumAlgorithm::Crc32C)) .key(key) .send() .await @@ -426,7 +415,7 @@ impl S3CompatibleObjectStorage { .s3_client .upload_part() .bucket(self.bucket.clone()) - .set_checksum_algorithm(self.upload_checksum_algorithm()) + .set_checksum_algorithm((!self.disable_checksums).then_some(ChecksumAlgorithm::Crc32C)) .key(key) .body(byte_stream) .content_length(part.len() as i64) @@ -969,7 +958,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, - flavor: None, + disable_checksums: false, }; assert_eq!( s3_storage.relative_path("indexes/foo"), @@ -1017,7 +1006,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: true, disable_multipart_upload: false, - flavor: None, + disable_checksums: false, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1055,7 +1044,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, - flavor: None, + disable_checksums: false, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1138,7 +1127,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, - flavor: None, + disable_checksums: false, }; let bulk_delete_error = s3_storage .bulk_delete(&[ @@ -1230,7 +1219,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, - flavor: None, + disable_checksums: false, }; s3_storage .put(Path::new("my-path"), Box::new(vec![1, 2, 3])) From 92d1fbfa0b5a43fca12dc1d04d4701c7c1251250 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Tue, 19 May 2026 15:28:20 -0400 Subject: [PATCH 5/7] lints --- .../src/object_storage/s3_compatible_storage.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 556b2cdd62c..e315eadac86 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -334,7 +334,9 @@ impl S3CompatibleObjectStorage { self.s3_client .create_multipart_upload() .bucket(self.bucket.clone()) - .set_checksum_algorithm((!self.disable_checksums).then_some(ChecksumAlgorithm::Crc32C)) + .set_checksum_algorithm( + (!self.disable_checksums).then_some(ChecksumAlgorithm::Crc32C), + ) .key(key) .send() .await From 9f2cfdf0c979590eea40442172e5508603c6a1fa Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Tue, 19 May 2026 17:39:44 -0400 Subject: [PATCH 6/7] Make it an enum --- quickwit/quickwit-config/src/lib.rs | 5 ++- .../quickwit-config/src/storage_config.rs | 30 +++++++++---- .../object_storage/s3_compatible_storage.rs | 45 ++++++++++++------- 3 files changed, 55 insertions(+), 25 deletions(-) diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index b10afbc8b0b..556f17be4b8 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -80,8 +80,9 @@ pub use crate::node_config::{ }; use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ - AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig, - S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, + AzureStorageConfig, ChecksumStrategy, FileStorageConfig, GoogleCloudStorageConfig, + RamStorageConfig, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, + StorageConfigs, }; /// Returns true if the ingest API v2 is enabled. diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index 88795a9228c..736d29aa7e6 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -38,6 +38,20 @@ pub enum StorageBackend { S3, } +/// Strategy used to checksum object-storage uploads. +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ChecksumStrategy { + /// CRC32C, computed and validated by the AWS SDK. Native S3 default. + #[default] + Crc32c, + /// MD5 (Content-MD5 header), computed client-side. Used by S3-compatible + /// implementations that predate the SDK's `x-amz-checksum-*` headers. + Md5, + /// No upload checksum is sent and no response checksum is validated. + Disabled, +} + #[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum StorageBackendFlavor { @@ -330,7 +344,7 @@ pub struct S3StorageConfig { #[serde(default)] pub disable_multipart_upload: bool, #[serde(default)] - pub disable_checksums: bool, + pub checksum_strategy: ChecksumStrategy, #[serde(default)] pub disable_stalled_stream_protection_upload: bool, #[serde(default)] @@ -343,22 +357,22 @@ impl S3StorageConfig { Some(StorageBackendFlavor::DigitalOcean) => { self.force_path_style_access = true; self.disable_multi_object_delete = true; - self.disable_checksums = true; + self.checksum_strategy = ChecksumStrategy::Disabled; } Some(StorageBackendFlavor::Garage) => { self.region = Some("garage".to_string()); self.force_path_style_access = true; - self.disable_checksums = true; + self.checksum_strategy = ChecksumStrategy::Disabled; } Some(StorageBackendFlavor::Gcs) => { self.disable_multi_object_delete = true; self.disable_multipart_upload = true; - self.disable_checksums = true; + self.checksum_strategy = ChecksumStrategy::Disabled; } Some(StorageBackendFlavor::MinIO) => { self.region = Some("minio".to_string()); self.force_path_style_access = true; - self.disable_checksums = true; + self.checksum_strategy = ChecksumStrategy::Disabled; } _ => {} } @@ -404,7 +418,7 @@ impl fmt::Debug for S3StorageConfig { &self.disable_multi_object_delete, ) .field("disable_multipart_upload", &self.disable_multipart_upload) - .field("disable_checksums", &self.disable_checksums) + .field("checksum_strategy", &self.checksum_strategy) .field( "disable_stalled_stream_protection_upload", &self.disable_stalled_stream_protection_upload, @@ -647,7 +661,7 @@ mod tests { force_path_style_access: true disable_multi_object_delete_requests: true disable_multipart_upload: true - disable_checksums: true + checksum_strategy: disabled disable_stalled_stream_protection_upload: true disable_stalled_stream_protection_download: true "#; @@ -660,7 +674,7 @@ mod tests { force_path_style_access: true, disable_multi_object_delete: true, disable_multipart_upload: true, - disable_checksums: true, + checksum_strategy: ChecksumStrategy::Disabled, disable_stalled_stream_protection_upload: true, disable_stalled_stream_protection_download: true, ..Default::default() diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index e315eadac86..731f638b1cb 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -43,7 +43,7 @@ use quickwit_aws::{aws_behavior_version, get_aws_config}; use quickwit_common::retry::{Retry, RetryParams}; use quickwit_common::uri::Uri; use quickwit_common::{chunk_range, into_u64_range}; -use quickwit_config::S3StorageConfig; +use quickwit_config::{ChecksumStrategy, S3StorageConfig}; use quickwit_metrics::HistogramTimer; use regex::Regex; use tokio::io::{AsyncRead, AsyncWriteExt, BufReader, ReadBuf}; @@ -95,7 +95,7 @@ pub struct S3CompatibleObjectStorage { retry_params: RetryParams, disable_multi_object_delete: bool, disable_multipart_upload: bool, - disable_checksums: bool, + checksum_strategy: ChecksumStrategy, } impl fmt::Debug for S3CompatibleObjectStorage { @@ -156,7 +156,10 @@ pub async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client { s3_config.set_stalled_stream_protection(Some(stalled_stream_protection)); s3_config.set_timeout_config(aws_config.timeout_config().cloned()); - if s3_storage_config.disable_checksums { + if matches!( + s3_storage_config.checksum_strategy, + ChecksumStrategy::Disabled + ) { s3_config.set_request_checksum_calculation(Some(RequestChecksumCalculation::WhenRequired)); s3_config.set_response_checksum_validation(Some(ResponseChecksumValidation::WhenRequired)); } @@ -200,7 +203,7 @@ impl S3CompatibleObjectStorage { retry_params, disable_multi_object_delete, disable_multipart_upload, - disable_checksums: s3_storage_config.disable_checksums, + checksum_strategy: s3_storage_config.checksum_strategy, }) } @@ -218,7 +221,7 @@ impl S3CompatibleObjectStorage { retry_params: self.retry_params, disable_multi_object_delete: self.disable_multi_object_delete, disable_multipart_upload: self.disable_multipart_upload, - disable_checksums: self.disable_checksums, + checksum_strategy: self.checksum_strategy, } } @@ -248,6 +251,17 @@ pub fn parse_s3_uri(uri: &Uri) -> Option<(String, PathBuf)> { Some((bucket, prefix)) } +/// Maps a configured [`ChecksumStrategy`] onto the AWS SDK's [`ChecksumAlgorithm`]. +/// The SDK computes & trails both CRC32C and MD5 internally; `Disabled` returns `None` +/// so no algorithm is advertised. +fn aws_checksum_algorithm(strategy: ChecksumStrategy) -> Option { + match strategy { + ChecksumStrategy::Crc32c => Some(ChecksumAlgorithm::Crc32C), + ChecksumStrategy::Md5 => Some(ChecksumAlgorithm::Md5), + ChecksumStrategy::Disabled => None, + } +} + #[derive(Clone, Debug)] struct MultipartUploadId(pub String); @@ -299,7 +313,7 @@ impl S3CompatibleObjectStorage { .key(key) .body(body) .content_length(len as i64) - .set_checksum_algorithm((!self.disable_checksums).then_some(ChecksumAlgorithm::Crc32C)) + .set_checksum_algorithm(aws_checksum_algorithm(self.checksum_strategy)) .send() .await .map_err(|sdk_error| { @@ -334,9 +348,7 @@ impl S3CompatibleObjectStorage { self.s3_client .create_multipart_upload() .bucket(self.bucket.clone()) - .set_checksum_algorithm( - (!self.disable_checksums).then_some(ChecksumAlgorithm::Crc32C), - ) + .set_checksum_algorithm(aws_checksum_algorithm(self.checksum_strategy)) .key(key) .send() .await @@ -417,7 +429,7 @@ impl S3CompatibleObjectStorage { .s3_client .upload_part() .bucket(self.bucket.clone()) - .set_checksum_algorithm((!self.disable_checksums).then_some(ChecksumAlgorithm::Crc32C)) + .set_checksum_algorithm(aws_checksum_algorithm(self.checksum_strategy)) .key(key) .body(byte_stream) .content_length(part.len() as i64) @@ -433,9 +445,12 @@ impl S3CompatibleObjectStorage { } })?; + // Only one checksum field is populated by the SDK, matching the algorithm we + // advertised on the upload; the rest stay `None`. let completed_part = CompletedPart::builder() .set_e_tag(upload_part_output.e_tag) .set_checksum_crc32_c(upload_part_output.checksum_crc32_c) + .set_checksum_md5(upload_part_output.checksum_md5) .part_number(part.part_number as i32) .build(); Ok(completed_part) @@ -960,7 +975,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, - disable_checksums: false, + checksum_strategy: ChecksumStrategy::Crc32c, }; assert_eq!( s3_storage.relative_path("indexes/foo"), @@ -1008,7 +1023,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: true, disable_multipart_upload: false, - disable_checksums: false, + checksum_strategy: ChecksumStrategy::Crc32c, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1046,7 +1061,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, - disable_checksums: false, + checksum_strategy: ChecksumStrategy::Crc32c, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1129,7 +1144,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, - disable_checksums: false, + checksum_strategy: ChecksumStrategy::Crc32c, }; let bulk_delete_error = s3_storage .bulk_delete(&[ @@ -1221,7 +1236,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, - disable_checksums: false, + checksum_strategy: ChecksumStrategy::Crc32c, }; s3_storage .put(Path::new("my-path"), Box::new(vec![1, 2, 3])) From ed54ce63f22e966fdf4a5b08f1e448e30169977b Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Tue, 19 May 2026 18:04:51 -0400 Subject: [PATCH 7/7] restore the md5 computation since its a noop on the aws sdk --- quickwit/Cargo.lock | 1 + quickwit/quickwit-storage/Cargo.toml | 1 + .../object_storage/s3_compatible_storage.rs | 81 ++++++++++++++++--- 3 files changed, 70 insertions(+), 13 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 41f3a1daee0..97b24e0725d 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -9390,6 +9390,7 @@ dependencies = [ "azure_identity", "azure_storage", "azure_storage_blobs", + "base64 0.22.1", "bytes", "bytesize", "fnv", diff --git a/quickwit/quickwit-storage/Cargo.toml b/quickwit/quickwit-storage/Cargo.toml index 9a1f4344289..8fe4701f631 100644 --- a/quickwit/quickwit-storage/Cargo.toml +++ b/quickwit/quickwit-storage/Cargo.toml @@ -13,6 +13,7 @@ license.workspace = true [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +base64 = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } fnv = { workspace = true } diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 731f638b1cb..9f9766baf9e 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -36,6 +36,7 @@ use aws_sdk_s3::types::builders::ObjectIdentifierBuilder; use aws_sdk_s3::types::{ ChecksumAlgorithm, CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier, }; +use base64::prelude::{BASE64_STANDARD, Engine}; use bytes::Bytes; use futures::{StreamExt, stream}; use quickwit_aws::retry::{AwsRetryable, aws_retry}; @@ -46,7 +47,7 @@ use quickwit_common::{chunk_range, into_u64_range}; use quickwit_config::{ChecksumStrategy, S3StorageConfig}; use quickwit_metrics::HistogramTimer; use regex::Regex; -use tokio::io::{AsyncRead, AsyncWriteExt, BufReader, ReadBuf}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader, ReadBuf}; use tokio::sync::Semaphore; use tracing::{info, instrument, warn}; @@ -251,14 +252,27 @@ pub fn parse_s3_uri(uri: &Uri) -> Option<(String, PathBuf)> { Some((bucket, prefix)) } -/// Maps a configured [`ChecksumStrategy`] onto the AWS SDK's [`ChecksumAlgorithm`]. -/// The SDK computes & trails both CRC32C and MD5 internally; `Disabled` returns `None` -/// so no algorithm is advertised. +/// Maps a [`ChecksumStrategy`] onto the AWS SDK's flexible-checksum algorithm. +/// `Md5` returns `None` because the S3 SDK silently no-ops `ChecksumAlgorithm::Md5`; +/// MD5 is instead sent via the legacy `Content-MD5` header, computed client-side. fn aws_checksum_algorithm(strategy: ChecksumStrategy) -> Option { match strategy { ChecksumStrategy::Crc32c => Some(ChecksumAlgorithm::Crc32C), - ChecksumStrategy::Md5 => Some(ChecksumAlgorithm::Md5), - ChecksumStrategy::Disabled => None, + ChecksumStrategy::Md5 | ChecksumStrategy::Disabled => None, + } +} + +const MD5_CHUNK_SIZE: usize = 1_000_000; + +async fn compute_md5(mut read: T) -> io::Result { + let mut checksum = md5::Context::new(); + let mut buf = vec![0; MD5_CHUNK_SIZE]; + loop { + let read_len = read.read(&mut buf).await?; + checksum.consume(&buf[..read_len]); + if read_len == 0 { + return Ok(checksum.finalize()); + } } } @@ -269,6 +283,9 @@ struct MultipartUploadId(pub String); struct Part { pub part_number: usize, pub range: Range, + /// Pre-computed MD5 of the part bytes; only populated when + /// [`ChecksumStrategy::Md5`] is in use. + pub md5: Option, } impl Part { @@ -362,16 +379,41 @@ impl S3CompatibleObjectStorage { Ok(MultipartUploadId(upload_id)) } - fn create_multipart_requests(&self, len: u64, part_len: u64) -> Vec { + /// Returns the MD5 of the byte range when the configured strategy is + /// [`ChecksumStrategy::Md5`], otherwise `None` (no I/O performed). + async fn maybe_compute_part_md5( + &self, + payload: &dyn crate::PutPayload, + range: Range, + ) -> io::Result> { + if !matches!(self.checksum_strategy, ChecksumStrategy::Md5) { + return Ok(None); + } + let read = payload.range_byte_stream(range).await?.into_async_read(); + Ok(Some(compute_md5(read).await?)) + } + + async fn create_multipart_requests( + &self, + payload: &dyn crate::PutPayload, + len: u64, + part_len: u64, + ) -> io::Result> { assert!(len > 0); - chunk_range(0..len as usize, part_len as usize) + let multipart_ranges: Vec> = chunk_range(0..len as usize, part_len as usize) .map(into_u64_range) - .enumerate() - .map(|(multipart_id, multipart_range)| Part { + .collect(); + let mut parts = Vec::with_capacity(multipart_ranges.len()); + for (multipart_id, multipart_range) in multipart_ranges.into_iter().enumerate() { + parts.push(Part { part_number: multipart_id + 1, // parts are 1-indexed + md5: self + .maybe_compute_part_md5(payload, multipart_range.clone()) + .await?, range: multipart_range, - }) - .collect() + }); + } + Ok(parts) } fn build_delete_batch_requests<'a>( @@ -425,11 +467,14 @@ impl S3CompatibleObjectStorage { crate::metrics::OBJECT_STORAGE_PUT_PARTS.inc(); crate::metrics::OBJECT_STORAGE_UPLOAD_NUM_BYTES.inc_by(part.len()); + let content_md5 = part.md5.map(|digest| BASE64_STANDARD.encode(digest.0)); let upload_part_output = self .s3_client .upload_part() .bucket(self.bucket.clone()) .set_checksum_algorithm(aws_checksum_algorithm(self.checksum_strategy)) + // None if checksum isnt md5. + .set_content_md5(content_md5) .key(key) .body(byte_stream) .content_length(part.len() as i64) @@ -464,7 +509,9 @@ impl S3CompatibleObjectStorage { total_len: u64, ) -> StorageResult<()> { let upload_id = self.create_multipart_upload(key).await?; - let parts = self.create_multipart_requests(total_len, part_len); + let parts = self + .create_multipart_requests(payload.as_ref(), total_len, part_len) + .await?; let max_concurrent_upload = self.multipart_policy.max_concurrent_uploads(); let completed_parts_res: StorageResult> = stream::iter(parts.into_iter().map(|part| { @@ -913,6 +960,14 @@ mod tests { use super::*; use crate::{MultiPartPolicy, S3CompatibleObjectStorage}; + #[tokio::test] + async fn test_md5_calc() -> std::io::Result<()> { + let data = (0..1_500_000).map(|el| el as u8).collect::>(); + let md5 = compute_md5(data.as_slice()).await?; + assert_eq!(md5, md5::compute(data)); + Ok(()) + } + #[test] fn test_split_range_into_chunks_inexact() { assert_eq!(