Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 120 additions & 23 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use crate::page_filter::PagePruningAccessPlanFilter;
use crate::row_filter::build_projection_read_plan;
use crate::row_group_filter::RowGroupAccessPlanFilter;
use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter};
use crate::{
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
Expand Down Expand Up @@ -70,7 +70,10 @@ use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy,
};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::parquet_column;
use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
use parquet::basic::Type;
use parquet::bloom_filter::Sbbf;
use parquet::errors::ParquetError;
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};

Expand Down Expand Up @@ -163,6 +166,9 @@ pub(super) struct ParquetOpener {
/// PruneWithStatistics
/// |
/// v
/// LoadBloomFilters
/// |
/// v
/// PruneWithBloomFilters
/// |
/// v
Expand Down Expand Up @@ -196,10 +202,10 @@ enum ParquetOpenState {
LoadPageIndex(BoxFuture<'static, Result<FiltersPreparedParquetOpen>>),
/// Pruning Row Groups
PruneWithStatistics(Box<FiltersPreparedParquetOpen>),
/// Pruning with Bloom Filters
///
/// TODO: split state as this currently does both I/O and CPU work
PruneWithBloomFilters(BoxFuture<'static, Result<RowGroupsPrunedParquetOpen>>),
/// Loading bloom filters required for row-group pruning
LoadBloomFilters(BoxFuture<'static, Result<BloomFiltersLoadedParquetOpen>>),
/// Pruning with preloaded Bloom Filters
PruneWithBloomFilters(Box<BloomFiltersLoadedParquetOpen>),
/// Builds the final reader stream
///
/// TODO: split state as this currently does both I/O and CPU work.
Expand Down Expand Up @@ -273,6 +279,17 @@ struct RowGroupsPrunedParquetOpen {
row_groups: RowGroupAccessPlanFilter,
}

/// State of [`ParquetOpenState`]
///
/// Result of loading bloom filters needed for row-group pruning.
struct BloomFiltersLoadedParquetOpen {
prepared: RowGroupsPrunedParquetOpen,
/// Bloom filters loaded for each row group that remains under consideration.
///
/// indexed by parquet row-group index
row_group_bloom_filters: Vec<BloomFilterStatistics>,
}

/// Implements state machine described in [`ParquetOpenState`]
struct ParquetOpenFuture {
state: ParquetOpenState,
Expand Down Expand Up @@ -354,13 +371,16 @@ impl ParquetOpenState {
}
ParquetOpenState::PruneWithStatistics(prepared) => {
let prepared_row_groups = prepared.prune_row_groups()?;
Ok(ParquetOpenState::PruneWithBloomFilters(
prepared_row_groups.prune_bloom_filters().boxed(),
Ok(ParquetOpenState::LoadBloomFilters(
prepared_row_groups.load_bloom_filters().boxed(),
))
}
ParquetOpenState::PruneWithBloomFilters(future) => {
Ok(ParquetOpenState::PruneWithBloomFilters(future))
ParquetOpenState::LoadBloomFilters(future) => {
Ok(ParquetOpenState::LoadBloomFilters(future))
}
ParquetOpenState::PruneWithBloomFilters(loaded) => Ok(
ParquetOpenState::BuildStream(Box::new(loaded.prune_bloom_filters())),
),
ParquetOpenState::BuildStream(prepared) => {
Ok(ParquetOpenState::Ready(prepared.build_stream()?))
}
Expand Down Expand Up @@ -413,13 +433,13 @@ impl Future for ParquetOpenFuture {
}
};
}
ParquetOpenState::PruneWithBloomFilters(mut future) => {
ParquetOpenState::LoadBloomFilters(mut future) => {
state = match future.poll_unpin(cx) {
Poll::Ready(result) => {
ParquetOpenState::BuildStream(Box::new(result?))
ParquetOpenState::PruneWithBloomFilters(Box::new(result?))
}
Poll::Pending => {
self.state = ParquetOpenState::PruneWithBloomFilters(future);
self.state = ParquetOpenState::LoadBloomFilters(future);
return Poll::Pending;
}
};
Expand All @@ -438,6 +458,7 @@ impl Future for ParquetOpenFuture {
ParquetOpenState::PruneFile(_) => {}
ParquetOpenState::PrepareFilters(_) => {}
ParquetOpenState::PruneWithStatistics(_) => {}
ParquetOpenState::PruneWithBloomFilters(_) => {}
ParquetOpenState::BuildStream(_) => {}
};

Expand Down Expand Up @@ -862,8 +883,17 @@ impl FiltersPreparedParquetOpen {
}

impl RowGroupsPrunedParquetOpen {
/// Apply bloom filter pruning when it is enabled and a pruning predicate exists.
async fn prune_bloom_filters(mut self) -> Result<Self> {
/// Load bloom filters needed for pruning when enabled and a pruning predicate exists.
async fn load_bloom_filters(mut self) -> Result<BloomFiltersLoadedParquetOpen> {
let num_row_groups = self
.prepared
.loaded
.reader_metadata
.metadata()
.num_row_groups();
let mut row_group_bloom_filters =
vec![BloomFilterStatistics::new(); num_row_groups];

if let Some(predicate) =
self.prepared.pruning_predicate.as_ref().map(|p| p.as_ref())
&& self.prepared.loaded.prepared.enable_bloom_filter
Expand All @@ -887,19 +917,86 @@ impl RowGroupsPrunedParquetOpen {
mem::replace(&mut prepared.async_file_reader, replacement_reader),
reader_metadata,
);
self.row_groups
.prune_by_bloom_filters(
&prepared.physical_file_schema,
&mut builder,
predicate,
&prepared.file_metrics,
)
.await;
let parquet_columns: Vec<(String, usize, Type)> = predicate
.literal_columns()
.into_iter()
.filter_map(|column_name| {
let parquet_schema = builder.parquet_schema();
let (column_idx, _) = parquet_column(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice if we could get this to also support struct columns / struct filters cc @friendlymatthew

parquet_schema,
&prepared.physical_file_schema,
&column_name,
)?;
Some((
column_name,
column_idx,
parquet_schema.column(column_idx).physical_type(),
))
})
.collect();

for idx in self.row_groups.row_group_indexes() {
let mut row_group_filters =
BloomFilterStatistics::with_capacity(parquet_columns.len());
for (column_name, column_idx, physical_type) in &parquet_columns {
let bf: Sbbf = match builder
.get_row_group_column_bloom_filter(idx, *column_idx)
.await
{
Ok(Some(bf)) => bf,
Ok(None) => continue,
Err(e) => {
debug!("Ignoring error reading bloom filter: {e}");
prepared.file_metrics.predicate_evaluation_errors.add(1);
continue;
}
};
row_group_filters.insert(column_name, bf, *physical_type);
}
row_group_bloom_filters[idx] = row_group_filters;
}
}

Ok(self)
Ok(BloomFiltersLoadedParquetOpen {
prepared: self,
row_group_bloom_filters,
})
}
}

impl BloomFiltersLoadedParquetOpen {
/// Apply bloom filter pruning using already loaded bloom filters.
fn prune_bloom_filters(mut self) -> RowGroupsPrunedParquetOpen {
let bloom_filter_eval_time = self
.prepared
.prepared
.loaded
.prepared
.file_metrics
.bloom_filter_eval_time
.clone();
let _timer_guard = bloom_filter_eval_time.timer();
if let Some(predicate) = self
.prepared
.prepared
.pruning_predicate
.as_ref()
.map(|p| p.as_ref())
&& self.prepared.prepared.loaded.prepared.enable_bloom_filter
&& !self.prepared.row_groups.is_empty()
{
self.prepared.row_groups.prune_by_bloom_filters(
predicate,
&self.prepared.prepared.loaded.prepared.file_metrics,
&self.row_group_bloom_filters,
);
}

self.prepared
}
}

impl RowGroupsPrunedParquetOpen {
/// Build the final parquet stream once all pruning work is complete.
fn build_stream(self) -> Result<BoxStream<'static, Result<RecordBatch>>> {
let RowGroupsPrunedParquetOpen {
Expand Down
Loading
Loading