diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index b10afbc8b0b..556f17be4b8 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -80,8 +80,9 @@ pub use crate::node_config::{ }; use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ - AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig, - S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, + AzureStorageConfig, ChecksumStrategy, FileStorageConfig, GoogleCloudStorageConfig, + RamStorageConfig, S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, + StorageConfigs, }; /// Returns true if the ingest API v2 is enabled. diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index 88795a9228c..736d29aa7e6 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -38,6 +38,20 @@ pub enum StorageBackend { S3, } +/// Strategy used to checksum object-storage uploads. +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ChecksumStrategy { + /// CRC32C, computed and validated by the AWS SDK. Native S3 default. + #[default] + Crc32c, + /// MD5 (Content-MD5 header), computed client-side. Used by S3-compatible + /// implementations that predate the SDK's `x-amz-checksum-*` headers. + Md5, + /// No upload checksum is sent and no response checksum is validated. + Disabled, +} + #[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum StorageBackendFlavor { @@ -330,7 +344,7 @@ pub struct S3StorageConfig { #[serde(default)] pub disable_multipart_upload: bool, #[serde(default)] - pub disable_checksums: bool, + pub checksum_strategy: ChecksumStrategy, #[serde(default)] pub disable_stalled_stream_protection_upload: bool, #[serde(default)] @@ -343,22 +357,22 @@ impl S3StorageConfig { Some(StorageBackendFlavor::DigitalOcean) => { self.force_path_style_access = true; self.disable_multi_object_delete = true; - self.disable_checksums = true; + self.checksum_strategy = ChecksumStrategy::Disabled; } Some(StorageBackendFlavor::Garage) => { self.region = Some("garage".to_string()); self.force_path_style_access = true; - self.disable_checksums = true; + self.checksum_strategy = ChecksumStrategy::Disabled; } Some(StorageBackendFlavor::Gcs) => { self.disable_multi_object_delete = true; self.disable_multipart_upload = true; - self.disable_checksums = true; + self.checksum_strategy = ChecksumStrategy::Disabled; } Some(StorageBackendFlavor::MinIO) => { self.region = Some("minio".to_string()); self.force_path_style_access = true; - self.disable_checksums = true; + self.checksum_strategy = ChecksumStrategy::Disabled; } _ => {} } @@ -404,7 +418,7 @@ impl fmt::Debug for S3StorageConfig { &self.disable_multi_object_delete, ) .field("disable_multipart_upload", &self.disable_multipart_upload) - .field("disable_checksums", &self.disable_checksums) + .field("checksum_strategy", &self.checksum_strategy) .field( "disable_stalled_stream_protection_upload", &self.disable_stalled_stream_protection_upload, @@ -647,7 +661,7 @@ mod tests { force_path_style_access: true disable_multi_object_delete_requests: true disable_multipart_upload: true - disable_checksums: true + checksum_strategy: disabled disable_stalled_stream_protection_upload: true disable_stalled_stream_protection_download: true "#; @@ -660,7 +674,7 @@ mod tests { force_path_style_access: true, disable_multi_object_delete: true, disable_multipart_upload: true, - disable_checksums: true, + checksum_strategy: ChecksumStrategy::Disabled, disable_stalled_stream_protection_upload: true, disable_stalled_stream_protection_download: true, ..Default::default() diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index a407ccdaeca..9f9766baf9e 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -33,7 +33,9 @@ use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput; use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput}; use aws_sdk_s3::primitives::{AggregatedBytes, ByteStream}; use aws_sdk_s3::types::builders::ObjectIdentifierBuilder; -use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier}; +use aws_sdk_s3::types::{ + ChecksumAlgorithm, CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier, +}; use base64::prelude::{BASE64_STANDARD, Engine}; use bytes::Bytes; use futures::{StreamExt, stream}; @@ -42,7 +44,7 @@ use quickwit_aws::{aws_behavior_version, get_aws_config}; use quickwit_common::retry::{Retry, RetryParams}; use quickwit_common::uri::Uri; use quickwit_common::{chunk_range, into_u64_range}; -use quickwit_config::S3StorageConfig; +use quickwit_config::{ChecksumStrategy, S3StorageConfig}; use quickwit_metrics::HistogramTimer; use regex::Regex; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader, ReadBuf}; @@ -94,6 +96,7 @@ pub struct S3CompatibleObjectStorage { retry_params: RetryParams, disable_multi_object_delete: bool, disable_multipart_upload: bool, + checksum_strategy: ChecksumStrategy, } impl fmt::Debug for S3CompatibleObjectStorage { @@ -154,7 +157,10 @@ pub async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client { s3_config.set_stalled_stream_protection(Some(stalled_stream_protection)); s3_config.set_timeout_config(aws_config.timeout_config().cloned()); - if s3_storage_config.disable_checksums { + if matches!( + s3_storage_config.checksum_strategy, + ChecksumStrategy::Disabled + ) { s3_config.set_request_checksum_calculation(Some(RequestChecksumCalculation::WhenRequired)); s3_config.set_response_checksum_validation(Some(ResponseChecksumValidation::WhenRequired)); } @@ -198,6 +204,7 @@ impl S3CompatibleObjectStorage { retry_params, disable_multi_object_delete, disable_multipart_upload, + checksum_strategy: s3_storage_config.checksum_strategy, }) } @@ -215,6 +222,7 @@ impl S3CompatibleObjectStorage { retry_params: self.retry_params, disable_multi_object_delete: self.disable_multi_object_delete, disable_multipart_upload: self.disable_multipart_upload, + checksum_strategy: self.checksum_strategy, } } @@ -244,19 +252,13 @@ pub fn parse_s3_uri(uri: &Uri) -> Option<(String, PathBuf)> { Some((bucket, prefix)) } -#[derive(Clone, Debug)] -struct MultipartUploadId(pub String); - -#[derive(Clone, Debug)] -struct Part { - pub part_number: usize, - pub range: Range, - pub md5: md5::Digest, -} - -impl Part { - fn len(&self) -> u64 { - self.range.end - self.range.start +/// Maps a [`ChecksumStrategy`] onto the AWS SDK's flexible-checksum algorithm. +/// `Md5` returns `None` because the S3 SDK silently no-ops `ChecksumAlgorithm::Md5`; +/// MD5 is instead sent via the legacy `Content-MD5` header, computed client-side. +fn aws_checksum_algorithm(strategy: ChecksumStrategy) -> Option { + match strategy { + ChecksumStrategy::Crc32c => Some(ChecksumAlgorithm::Crc32C), + ChecksumStrategy::Md5 | ChecksumStrategy::Disabled => None, } } @@ -274,6 +276,24 @@ async fn compute_md5(mut read: T) -> io::Resu } } +#[derive(Clone, Debug)] +struct MultipartUploadId(pub String); + +#[derive(Clone, Debug)] +struct Part { + pub part_number: usize, + pub range: Range, + /// Pre-computed MD5 of the part bytes; only populated when + /// [`ChecksumStrategy::Md5`] is in use. + pub md5: Option, +} + +impl Part { + fn len(&self) -> u64 { + self.range.end - self.range.start + } +} + impl S3CompatibleObjectStorage { fn key(&self, relative_path: &Path) -> String { // FIXME: This may not work on Windows. @@ -310,6 +330,7 @@ impl S3CompatibleObjectStorage { .key(key) .body(body) .content_length(len as i64) + .set_checksum_algorithm(aws_checksum_algorithm(self.checksum_strategy)) .send() .await .map_err(|sdk_error| { @@ -322,6 +343,7 @@ impl S3CompatibleObjectStorage { Ok(()) } + #[tracing::instrument(skip_all)] async fn put_single_part<'a>( &'a self, key: &'a str, @@ -343,6 +365,7 @@ impl S3CompatibleObjectStorage { self.s3_client .create_multipart_upload() .bucket(self.bucket.clone()) + .set_checksum_algorithm(aws_checksum_algorithm(self.checksum_strategy)) .key(key) .send() .await @@ -356,32 +379,39 @@ impl S3CompatibleObjectStorage { Ok(MultipartUploadId(upload_id)) } + /// Returns the MD5 of the byte range when the configured strategy is + /// [`ChecksumStrategy::Md5`], otherwise `None` (no I/O performed). + async fn maybe_compute_part_md5( + &self, + payload: &dyn crate::PutPayload, + range: Range, + ) -> io::Result> { + if !matches!(self.checksum_strategy, ChecksumStrategy::Md5) { + return Ok(None); + } + let read = payload.range_byte_stream(range).await?.into_async_read(); + Ok(Some(compute_md5(read).await?)) + } + async fn create_multipart_requests( &self, - payload: Box, + payload: &dyn crate::PutPayload, len: u64, part_len: u64, ) -> io::Result> { assert!(len > 0); - let multipart_ranges = chunk_range(0..len as usize, part_len as usize) + let multipart_ranges: Vec> = chunk_range(0..len as usize, part_len as usize) .map(into_u64_range) - .collect::>(); - + .collect(); let mut parts = Vec::with_capacity(multipart_ranges.len()); - for (multipart_id, multipart_range) in multipart_ranges.into_iter().enumerate() { - let read = payload - .range_byte_stream(multipart_range.clone()) - .await? - .into_async_read(); - let md5 = compute_md5(read).await?; - - let part = Part { + parts.push(Part { part_number: multipart_id + 1, // parts are 1-indexed + md5: self + .maybe_compute_part_md5(payload, multipart_range.clone()) + .await?, range: multipart_range, - md5, - }; - parts.push(part); + }); } Ok(parts) } @@ -420,6 +450,7 @@ impl S3CompatibleObjectStorage { Ok(delete_requests) } + #[tracing::instrument(skip_all, fields(part_number = part.part_number, num_bytes=part.len()))] async fn upload_part<'a>( &'a self, upload_id: MultipartUploadId, @@ -432,33 +463,39 @@ impl S3CompatibleObjectStorage { .await .map_err(StorageError::from) .map_err(Retry::Permanent)?; - let md5 = BASE64_STANDARD.encode(part.md5.0); crate::metrics::OBJECT_STORAGE_PUT_PARTS.inc(); crate::metrics::OBJECT_STORAGE_UPLOAD_NUM_BYTES.inc_by(part.len()); + let content_md5 = part.md5.map(|digest| BASE64_STANDARD.encode(digest.0)); let upload_part_output = self .s3_client .upload_part() .bucket(self.bucket.clone()) + .set_checksum_algorithm(aws_checksum_algorithm(self.checksum_strategy)) + // None if checksum isnt md5. + .set_content_md5(content_md5) .key(key) .body(byte_stream) .content_length(part.len() as i64) - .content_md5(md5) .part_number(part.part_number as i32) .upload_id(upload_id.0) .send() .await - .map_err(|sdk_error| { - if sdk_error.is_retryable() { - Retry::Transient(StorageError::from(sdk_error)) + .map_err(|s3_err| { + if s3_err.is_retryable() { + Retry::Transient(StorageError::from(s3_err)) } else { - Retry::Permanent(StorageError::from(sdk_error)) + Retry::Permanent(StorageError::from(s3_err)) } })?; + // Only one checksum field is populated by the SDK, matching the algorithm we + // advertised on the upload; the rest stay `None`. let completed_part = CompletedPart::builder() .set_e_tag(upload_part_output.e_tag) + .set_checksum_crc32_c(upload_part_output.checksum_crc32_c) + .set_checksum_md5(upload_part_output.checksum_md5) .part_number(part.part_number as i32) .build(); Ok(completed_part) @@ -473,7 +510,7 @@ impl S3CompatibleObjectStorage { ) -> StorageResult<()> { let upload_id = self.create_multipart_upload(key).await?; let parts = self - .create_multipart_requests(payload.clone(), total_len, part_len) + .create_multipart_requests(payload.as_ref(), total_len, part_len) .await?; let max_concurrent_upload = self.multipart_policy.max_concurrent_uploads(); let completed_parts_res: StorageResult> = @@ -488,7 +525,7 @@ impl S3CompatibleObjectStorage { .collect::>() .await .into_iter() - .map(|res| res.map_err(|error| error.into_inner())) + .map(|res| res.map_err(|e| e.into_inner())) .collect(); match completed_parts_res { Ok(completed_parts) => { @@ -928,7 +965,6 @@ mod tests { let data = (0..1_500_000).map(|el| el as u8).collect::>(); let md5 = compute_md5(data.as_slice()).await?; assert_eq!(md5, md5::compute(data)); - Ok(()) } @@ -994,6 +1030,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + checksum_strategy: ChecksumStrategy::Crc32c, }; assert_eq!( s3_storage.relative_path("indexes/foo"), @@ -1041,6 +1078,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: true, disable_multipart_upload: false, + checksum_strategy: ChecksumStrategy::Crc32c, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1078,6 +1116,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + checksum_strategy: ChecksumStrategy::Crc32c, }; let _ = s3_storage .bulk_delete(&[Path::new("foo"), Path::new("bar")]) @@ -1160,6 +1199,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + checksum_strategy: ChecksumStrategy::Crc32c, }; let bulk_delete_error = s3_storage .bulk_delete(&[ @@ -1251,6 +1291,7 @@ mod tests { retry_params: RetryParams::for_test(), disable_multi_object_delete: false, disable_multipart_upload: false, + checksum_strategy: ChecksumStrategy::Crc32c, }; s3_storage .put(Path::new("my-path"), Box::new(vec![1, 2, 3]))