diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 466f631c53d81..6621706c35c81 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -19,7 +19,7 @@ use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_filter::build_projection_read_plan; -use crate::row_group_filter::RowGroupAccessPlanFilter; +use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, @@ -70,7 +70,10 @@ use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy, }; use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::parquet_column; use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder}; +use parquet::basic::Type; +use parquet::bloom_filter::Sbbf; use parquet::errors::ParquetError; use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; @@ -163,6 +166,9 @@ pub(super) struct ParquetOpener { /// PruneWithStatistics /// | /// v +/// LoadBloomFilters +/// | +/// v /// PruneWithBloomFilters /// | /// v @@ -196,10 +202,10 @@ enum ParquetOpenState { LoadPageIndex(BoxFuture<'static, Result>), /// Pruning Row Groups PruneWithStatistics(Box), - /// Pruning with Bloom Filters - /// - /// TODO: split state as this currently does both I/O and CPU work - PruneWithBloomFilters(BoxFuture<'static, Result>), + /// Loading bloom filters required for row-group pruning + LoadBloomFilters(BoxFuture<'static, Result>), + /// Pruning with preloaded Bloom Filters + PruneWithBloomFilters(Box), /// Builds the final reader stream /// /// TODO: split state as this currently does both I/O and CPU work. @@ -273,6 +279,17 @@ struct RowGroupsPrunedParquetOpen { row_groups: RowGroupAccessPlanFilter, } +/// State of [`ParquetOpenState`] +/// +/// Result of loading bloom filters needed for row-group pruning. +struct BloomFiltersLoadedParquetOpen { + prepared: RowGroupsPrunedParquetOpen, + /// Bloom filters loaded for each row group that remains under consideration. + /// + /// indexed by parquet row-group index + row_group_bloom_filters: Vec, +} + /// Implements state machine described in [`ParquetOpenState`] struct ParquetOpenFuture { state: ParquetOpenState, @@ -354,13 +371,16 @@ impl ParquetOpenState { } ParquetOpenState::PruneWithStatistics(prepared) => { let prepared_row_groups = prepared.prune_row_groups()?; - Ok(ParquetOpenState::PruneWithBloomFilters( - prepared_row_groups.prune_bloom_filters().boxed(), + Ok(ParquetOpenState::LoadBloomFilters( + prepared_row_groups.load_bloom_filters().boxed(), )) } - ParquetOpenState::PruneWithBloomFilters(future) => { - Ok(ParquetOpenState::PruneWithBloomFilters(future)) + ParquetOpenState::LoadBloomFilters(future) => { + Ok(ParquetOpenState::LoadBloomFilters(future)) } + ParquetOpenState::PruneWithBloomFilters(loaded) => Ok( + ParquetOpenState::BuildStream(Box::new(loaded.prune_bloom_filters())), + ), ParquetOpenState::BuildStream(prepared) => { Ok(ParquetOpenState::Ready(prepared.build_stream()?)) } @@ -413,13 +433,13 @@ impl Future for ParquetOpenFuture { } }; } - ParquetOpenState::PruneWithBloomFilters(mut future) => { + ParquetOpenState::LoadBloomFilters(mut future) => { state = match future.poll_unpin(cx) { Poll::Ready(result) => { - ParquetOpenState::BuildStream(Box::new(result?)) + ParquetOpenState::PruneWithBloomFilters(Box::new(result?)) } Poll::Pending => { - self.state = ParquetOpenState::PruneWithBloomFilters(future); + self.state = ParquetOpenState::LoadBloomFilters(future); return Poll::Pending; } }; @@ -438,6 +458,7 @@ impl Future for ParquetOpenFuture { ParquetOpenState::PruneFile(_) => {} ParquetOpenState::PrepareFilters(_) => {} ParquetOpenState::PruneWithStatistics(_) => {} + ParquetOpenState::PruneWithBloomFilters(_) => {} ParquetOpenState::BuildStream(_) => {} }; @@ -862,8 +883,17 @@ impl FiltersPreparedParquetOpen { } impl RowGroupsPrunedParquetOpen { - /// Apply bloom filter pruning when it is enabled and a pruning predicate exists. - async fn prune_bloom_filters(mut self) -> Result { + /// Load bloom filters needed for pruning when enabled and a pruning predicate exists. + async fn load_bloom_filters(mut self) -> Result { + let num_row_groups = self + .prepared + .loaded + .reader_metadata + .metadata() + .num_row_groups(); + let mut row_group_bloom_filters = + vec![BloomFilterStatistics::new(); num_row_groups]; + if let Some(predicate) = self.prepared.pruning_predicate.as_ref().map(|p| p.as_ref()) && self.prepared.loaded.prepared.enable_bloom_filter @@ -887,19 +917,86 @@ impl RowGroupsPrunedParquetOpen { mem::replace(&mut prepared.async_file_reader, replacement_reader), reader_metadata, ); - self.row_groups - .prune_by_bloom_filters( - &prepared.physical_file_schema, - &mut builder, - predicate, - &prepared.file_metrics, - ) - .await; + let parquet_columns: Vec<(String, usize, Type)> = predicate + .literal_columns() + .into_iter() + .filter_map(|column_name| { + let parquet_schema = builder.parquet_schema(); + let (column_idx, _) = parquet_column( + parquet_schema, + &prepared.physical_file_schema, + &column_name, + )?; + Some(( + column_name, + column_idx, + parquet_schema.column(column_idx).physical_type(), + )) + }) + .collect(); + + for idx in self.row_groups.row_group_indexes() { + let mut row_group_filters = + BloomFilterStatistics::with_capacity(parquet_columns.len()); + for (column_name, column_idx, physical_type) in &parquet_columns { + let bf: Sbbf = match builder + .get_row_group_column_bloom_filter(idx, *column_idx) + .await + { + Ok(Some(bf)) => bf, + Ok(None) => continue, + Err(e) => { + debug!("Ignoring error reading bloom filter: {e}"); + prepared.file_metrics.predicate_evaluation_errors.add(1); + continue; + } + }; + row_group_filters.insert(column_name, bf, *physical_type); + } + row_group_bloom_filters[idx] = row_group_filters; + } } - Ok(self) + Ok(BloomFiltersLoadedParquetOpen { + prepared: self, + row_group_bloom_filters, + }) } +} +impl BloomFiltersLoadedParquetOpen { + /// Apply bloom filter pruning using already loaded bloom filters. + fn prune_bloom_filters(mut self) -> RowGroupsPrunedParquetOpen { + let bloom_filter_eval_time = self + .prepared + .prepared + .loaded + .prepared + .file_metrics + .bloom_filter_eval_time + .clone(); + let _timer_guard = bloom_filter_eval_time.timer(); + if let Some(predicate) = self + .prepared + .prepared + .pruning_predicate + .as_ref() + .map(|p| p.as_ref()) + && self.prepared.prepared.loaded.prepared.enable_bloom_filter + && !self.prepared.row_groups.is_empty() + { + self.prepared.row_groups.prune_by_bloom_filters( + predicate, + &self.prepared.prepared.loaded.prepared.file_metrics, + &self.row_group_bloom_filters, + ); + } + + self.prepared + } +} + +impl RowGroupsPrunedParquetOpen { /// Build the final parquet stream once all pruning work is complete. fn build_stream(self) -> Result>> { let RowGroupsPrunedParquetOpen { diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 3b719e4603a95..7a2ed8f2777e3 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -28,15 +28,10 @@ use datafusion_physical_expr::PhysicalExprSimplifier; use datafusion_physical_expr::expressions::NotExpr; use datafusion_pruning::PruningPredicate; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; -use parquet::arrow::parquet_column; use parquet::basic::Type; use parquet::data_type::Decimal; use parquet::schema::types::SchemaDescriptor; -use parquet::{ - arrow::{ParquetRecordBatchStreamBuilder, async_reader::AsyncFileReader}, - bloom_filter::Sbbf, - file::metadata::RowGroupMetaData, -}; +use parquet::{bloom_filter::Sbbf, file::metadata::RowGroupMetaData}; /// Reduces the [`ParquetAccessPlan`] based on row group level metadata. /// @@ -74,6 +69,11 @@ impl RowGroupAccessPlanFilter { self.access_plan.row_group_index_iter().count() } + /// Return indexes of row groups that still need to be scanned. + pub fn row_group_indexes(&self) -> impl Iterator + '_ { + self.access_plan.row_group_index_iter() + } + /// Returns the inner access plan pub fn build(self) -> ParquetAccessPlan { self.access_plan @@ -368,62 +368,32 @@ impl RowGroupAccessPlanFilter { } } - /// Prune remaining row groups using available bloom filters and the + /// Prune remaining row groups using loaded bloom filters and the /// [`PruningPredicate`]. /// - /// Updates this set with row groups that should not be scanned + /// Updates this set with row groups that should not be scanned. + /// `row_group_bloom_filters[idx]` contains the bloom filters for the + /// parquet row group at index `idx`. /// /// # Panics - /// if the builder does not have the same number of row groups as this set - pub async fn prune_by_bloom_filters( + /// if `row_group_bloom_filters` does not have the same number of row groups as this set + pub(crate) fn prune_by_bloom_filters( &mut self, - arrow_schema: &Schema, - builder: &mut ParquetRecordBatchStreamBuilder, predicate: &PruningPredicate, metrics: &ParquetFileMetrics, + row_group_bloom_filters: &[BloomFilterStatistics], ) { // scoped timer updates on drop let _timer_guard = metrics.bloom_filter_eval_time.timer(); - assert_eq!(builder.metadata().num_row_groups(), self.access_plan.len()); - for idx in 0..self.access_plan.len() { + assert_eq!(row_group_bloom_filters.len(), self.access_plan.len()); + for (idx, stats) in row_group_bloom_filters.iter().enumerate() { if !self.access_plan.should_scan(idx) { continue; } - // Attempt to find bloom filters for filtering this row group - let literal_columns = predicate.literal_columns(); - let mut column_sbbf = HashMap::with_capacity(literal_columns.len()); - - for column_name in literal_columns { - let Some((column_idx, _field)) = - parquet_column(builder.parquet_schema(), arrow_schema, &column_name) - else { - continue; - }; - - let bf = match builder - .get_row_group_column_bloom_filter(idx, column_idx) - .await - { - Ok(Some(bf)) => bf, - Ok(None) => continue, // no bloom filter for this column - Err(e) => { - log::debug!("Ignoring error reading bloom filter: {e}"); - metrics.predicate_evaluation_errors.add(1); - continue; - } - }; - let physical_type = - builder.parquet_schema().column(column_idx).physical_type(); - - column_sbbf.insert(column_name.to_string(), (bf, physical_type)); - } - - let stats = BloomFilterStatistics { column_sbbf }; - // Can this group be pruned? - let prune_group = match predicate.prune(&stats) { + let prune_group = match predicate.prune(stats) { Ok(values) => !values[0], Err(e) => { log::debug!( @@ -443,13 +413,39 @@ impl RowGroupAccessPlanFilter { } } } -/// Implements [`PruningStatistics`] for Parquet Split Block Bloom Filters (SBBF) -struct BloomFilterStatistics { - /// Maps column name to the parquet bloom filter and parquet physical type + +/// In memory Parquet Split Block Bloom Filters (SBBF). +/// +/// This structure implements [`PruningStatistics`] and is used to prune +/// Parquet row groups and data pages based on the query predicate. +#[derive(Debug, Clone, Default)] +pub(crate) struct BloomFilterStatistics { + /// Per-column Bloom filters + /// Key: predicate column name + /// Value: + /// * [`Sbbf`] (Bloom filter), + /// * Parquet physical [`Type`] needed to evaluate literals against the filter column_sbbf: HashMap, } impl BloomFilterStatistics { + /// Create an empty [`BloomFilterStatistics`] + pub(crate) fn new() -> Self { + Default::default() + } + + /// Create an empty [`BloomFilterStatistics`] with the specified capacity + pub(crate) fn with_capacity(capacity: usize) -> Self { + Self { + column_sbbf: HashMap::with_capacity(capacity), + } + } + + /// Add a Bloom filter and type for the specified column + pub(crate) fn insert(&mut self, column: impl Into, sbbf: Sbbf, ty: Type) { + self.column_sbbf.insert(column.into(), (sbbf, ty)); + } + /// Helper function for checking if [`Sbbf`] filter contains [`ScalarValue`]. /// /// In case the type of scalar is not supported, returns `true`, assuming that the @@ -662,6 +658,7 @@ mod tests { use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use object_store::ObjectStoreExt; use parquet::arrow::ArrowSchemaConverter; + use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::async_reader::ParquetObjectReader; use parquet::basic::LogicalType; use parquet::data_type::{ByteArray, FixedLenByteArray}; @@ -1784,14 +1781,52 @@ mod tests { let access_plan = ParquetAccessPlan::new_all(builder.metadata().num_row_groups()); let mut pruned_row_groups = RowGroupAccessPlanFilter::new(access_plan); - pruned_row_groups - .prune_by_bloom_filters( - pruning_predicate.schema(), - &mut builder, - pruning_predicate, - &file_metrics, - ) - .await; + let literal_columns = pruning_predicate.literal_columns(); + let parquet_columns: Vec<_> = literal_columns + .into_iter() + .filter_map(|column_name| { + let (column_idx, _) = parquet::arrow::parquet_column( + builder.parquet_schema(), + pruning_predicate.schema(), + &column_name, + )?; + Some(( + column_name.to_string(), + column_idx, + builder.parquet_schema().column(column_idx).physical_type(), + )) + }) + .collect::>(); + let mut row_group_bloom_filters = + Vec::with_capacity(builder.metadata().num_row_groups()); + row_group_bloom_filters.resize_with( + builder.metadata().num_row_groups(), + BloomFilterStatistics::new, + ); + for idx in pruned_row_groups.row_group_indexes() { + let mut column_sbbf = HashMap::with_capacity(parquet_columns.len()); + for (column_name, column_idx, physical_type) in &parquet_columns { + let bf = match builder + .get_row_group_column_bloom_filter(idx, *column_idx) + .await + { + Ok(Some(bf)) => bf, + Ok(None) => continue, + Err(e) => { + log::debug!("Ignoring error reading bloom filter: {e}"); + file_metrics.predicate_evaluation_errors.add(1); + continue; + } + }; + column_sbbf.insert(column_name.clone(), (bf, *physical_type)); + } + row_group_bloom_filters[idx] = BloomFilterStatistics { column_sbbf }; + } + pruned_row_groups.prune_by_bloom_filters( + pruning_predicate, + &file_metrics, + &row_group_bloom_filters, + ); Ok(pruned_row_groups) }