diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 6f38df46e3d2e..5f165f9647a93 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -892,8 +892,11 @@ mod tests { .await; // There should be no predicate evaluation errors let metrics = rt.parquet_exec.metrics().unwrap(); - assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0); - assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0); + assert_eq!( + get_value_or_zero(&metrics, "predicate_evaluation_errors"), + 0 + ); + assert_eq!(get_value_or_zero(&metrics, "pushdown_rows_matched"), 0); assert_eq!(rt.batches.unwrap().len(), 0); // Predicate should prune no row groups @@ -905,8 +908,11 @@ mod tests { .await; // There should be no predicate evaluation errors let metrics = rt.parquet_exec.metrics().unwrap(); - assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0); - assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0); + assert_eq!( + get_value_or_zero(&metrics, "predicate_evaluation_errors"), + 0 + ); + assert_eq!(get_value_or_zero(&metrics, "pushdown_rows_matched"), 0); let read = rt .batches .unwrap() @@ -934,7 +940,10 @@ mod tests { .await; // There should be no predicate evaluation errors let metrics = rt.parquet_exec.metrics().unwrap(); - assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0); + assert_eq!( + get_value_or_zero(&metrics, "predicate_evaluation_errors"), + 0 + ); assert_eq!(rt.batches.unwrap().len(), 0); // Predicate should prune no row groups @@ -946,7 +955,10 @@ mod tests { .await; // There should be no predicate evaluation errors let metrics = rt.parquet_exec.metrics().unwrap(); - assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0); + assert_eq!( + get_value_or_zero(&metrics, "predicate_evaluation_errors"), + 0 + ); let read = rt .batches .unwrap() @@ -986,7 +998,10 @@ mod tests { .await; // There should be no predicate evaluation errors and we keep 1 row let metrics = rt.parquet_exec.metrics().unwrap(); - assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0); + assert_eq!( + get_value_or_zero(&metrics, "predicate_evaluation_errors"), + 0 + ); let read = rt .batches .unwrap() @@ -994,7 +1009,10 @@ mod tests { .map(|b| b.num_rows()) .sum::(); assert_eq!(read, 1, "Expected 1 rows to match the predicate"); - assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0); + assert_eq!( + get_value_or_zero(&metrics, "row_groups_pruned_statistics"), + 0 + ); assert_eq!(get_value(&metrics, "page_index_rows_pruned"), 2); assert_eq!(get_value(&metrics, "page_index_pages_pruned"), 1); assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 1); @@ -1012,7 +1030,10 @@ mod tests { .await; // There should be no predicate evaluation errors and we keep 0 rows let metrics = rt.parquet_exec.metrics().unwrap(); - assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0); + assert_eq!( + get_value_or_zero(&metrics, "predicate_evaluation_errors"), + 0 + ); let read = rt .batches .unwrap() @@ -2147,6 +2168,18 @@ mod tests { } } + fn get_value_or_zero(metrics: &MetricsSet, metric_name: &str) -> usize { + metrics + .sum_by_name(metric_name) + .map(|v| match v { + MetricValue::PruningMetrics { + pruning_metrics, .. + } => pruning_metrics.pruned(), + _ => v.as_usize(), + }) + .unwrap_or(0) + } + fn get_pruning_metric(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) { match metrics.sum_by_name(metric_name) { Some(MetricValue::PruningMetrics { diff --git a/datafusion/core/tests/parquet/external_access_plan.rs b/datafusion/core/tests/parquet/external_access_plan.rs index 31be6fd979fd6..898082434da2e 100644 --- a/datafusion/core/tests/parquet/external_access_plan.rs +++ b/datafusion/core/tests/parquet/external_access_plan.rs @@ -84,7 +84,7 @@ async fn skip_all() { .await; // Verify that skipping all row groups skips reading any data at all - let bytes_scanned = metric_value(&parquet_metrics, "bytes_scanned").unwrap(); + let bytes_scanned = metric_value(&parquet_metrics, "bytes_scanned").unwrap_or(0); assert_eq!(bytes_scanned, 0, "metrics : {parquet_metrics:#?}",); } diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index e6266b2c088d7..b176b02b135b6 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -544,9 +544,17 @@ impl<'a> TestCase<'a> { PushdownExpected::None }; - let pushdown_rows_pruned = get_value(&metrics, "pushdown_rows_pruned"); + let pushdown_rows_pruned = match pushdown_expected { + PushdownExpected::None => { + metric_value_or_zero(&metrics, "pushdown_rows_pruned") + } + PushdownExpected::Some => { + expect_metric_value(&metrics, "pushdown_rows_pruned") + } + }; + let pushdown_rows_matched = + metric_value_or_zero(&metrics, "pushdown_rows_matched"); println!(" pushdown_rows_pruned: {pushdown_rows_pruned}"); - let pushdown_rows_matched = get_value(&metrics, "pushdown_rows_matched"); println!(" pushdown_rows_matched: {pushdown_rows_matched}"); match pushdown_expected { @@ -562,11 +570,6 @@ impl<'a> TestCase<'a> { } }; - let (page_index_rows_pruned, page_index_rows_matched) = - get_pruning_metrics(&metrics, "page_index_rows_pruned"); - println!(" page_index_rows_pruned: {page_index_rows_pruned}"); - println!(" page_index_rows_matched: {page_index_rows_matched}"); - let page_index_filtering_expected = if scan_options.enable_page_index { self.page_index_filtering_expected } else { @@ -577,9 +580,17 @@ impl<'a> TestCase<'a> { match page_index_filtering_expected { PageIndexFilteringExpected::None => { + let (page_index_rows_pruned, page_index_rows_matched) = + pruning_metrics_or_zero(&metrics, "page_index_rows_pruned"); + println!(" page_index_rows_pruned: {page_index_rows_pruned}"); + println!(" page_index_rows_matched: {page_index_rows_matched}"); assert_eq!(page_index_rows_pruned, 0); } PageIndexFilteringExpected::Some => { + let (page_index_rows_pruned, page_index_rows_matched) = + expect_pruning_metrics(&metrics, "page_index_rows_pruned"); + println!(" page_index_rows_pruned: {page_index_rows_pruned}"); + println!(" page_index_rows_matched: {page_index_rows_matched}"); assert!( page_index_rows_pruned > 0, "Expected to filter rows via page index but none were", @@ -591,7 +602,19 @@ impl<'a> TestCase<'a> { } } -fn get_pruning_metrics(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) { +fn pruning_metrics_or_zero(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) { + get_pruning_metrics(metrics, metric_name, true) +} + +fn expect_pruning_metrics(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) { + get_pruning_metrics(metrics, metric_name, false) +} + +fn get_pruning_metrics( + metrics: &MetricsSet, + metric_name: &str, + allow_missing: bool, +) -> (usize, usize) { match metrics.sum_by_name(metric_name) { Some(MetricValue::PruningMetrics { pruning_metrics, .. @@ -599,21 +622,35 @@ fn get_pruning_metrics(metrics: &MetricsSet, metric_name: &str) -> (usize, usize Some(_) => { panic!("Metric '{metric_name}' is not a pruning metric in\n\n{metrics:#?}") } - None => panic!( - "Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}" - ), + None if allow_missing => (0, 0), + None => panic!("Expected metric '{metric_name}' not found in\n\n{metrics:#?}"), } } -fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize { +fn metric_value_or_zero(metrics: &MetricsSet, metric_name: &str) -> usize { + get_value(metrics, metric_name, true) +} + +fn expect_metric_value(metrics: &MetricsSet, metric_name: &str) -> usize { + get_value(metrics, metric_name, false) +} + +fn metric_value_for_expected( + metrics: &MetricsSet, + metric_name: &str, + expected: usize, +) -> usize { + get_value(metrics, metric_name, expected == 0) +} + +fn get_value(metrics: &MetricsSet, metric_name: &str, allow_missing: bool) -> usize { match metrics.sum_by_name(metric_name) { Some(MetricValue::PruningMetrics { pruning_metrics, .. }) => pruning_metrics.pruned(), Some(v) => v.as_usize(), - None => panic!( - "Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}" - ), + None if allow_missing => 0, + None => panic!("Expected metric '{metric_name}' not found in\n\n{metrics:#?}"), } } @@ -736,11 +773,19 @@ impl PredicateCacheTest { // verify the predicate cache metrics assert_eq!( - get_value(&metrics, "predicate_cache_inner_records"), + metric_value_for_expected( + &metrics, + "predicate_cache_inner_records", + expected_inner_records + ), expected_inner_records ); assert_eq!( - get_value(&metrics, "predicate_cache_records"), + metric_value_for_expected( + &metrics, + "predicate_cache_records", + expected_records + ), expected_records ); Ok(()) diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 0e936a79ebe9f..98a9c4caf650c 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -267,6 +267,20 @@ impl TestOutput { } } +pub(crate) fn zero_if_metric_absent( + actual: Option, + expected: Option, +) -> Option { + // Lazy parquet metrics may represent zero as an absent metric. Tests that + // specifically enforce lazy-skipping zero metrics live with the metrics + // implementation; these integration tests only compare scan behavior. + if expected == Some(0) { + Some(actual.unwrap_or(0)) + } else { + actual + } +} + /// Creates an execution context that has an external table "t" /// registered pointing at a parquet file made with `make_test_file` /// and the appropriate scenario diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index a41803191ad05..36fa6467e6374 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use crate::parquet::Unit::Page; -use crate::parquet::{ContextWithParquet, Scenario}; +use crate::parquet::{ContextWithParquet, Scenario, zero_if_metric_absent}; use arrow::array::{Int32Array, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema}; @@ -238,8 +238,14 @@ async fn test_prune( .await; println!("{}", output.description()); - assert_eq!(output.predicate_evaluation_errors(), expected_errors); - assert_eq!(output.row_pages_pruned(), expected_row_pages_pruned); + assert_eq!( + zero_if_metric_absent(output.predicate_evaluation_errors(), expected_errors), + expected_errors + ); + assert_eq!( + zero_if_metric_absent(output.row_pages_pruned(), expected_row_pages_pruned), + expected_row_pages_pruned + ); assert_eq!( output.result_rows, expected_results, @@ -360,7 +366,10 @@ async fn prune_date64() { println!("{}", output.description()); // This should prune out groups without error - assert_eq!(output.predicate_evaluation_errors(), Some(0)); + assert_eq!( + zero_if_metric_absent(output.predicate_evaluation_errors(), Some(0)), + Some(0) + ); assert_eq!(output.row_pages_pruned(), Some(15)); assert_eq!(output.result_rows, 1, "{}", output.description()); } diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 441d1af3e96fd..07e58496d129b 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -27,7 +27,7 @@ use datafusion_common::{DataFusionError, ScalarValue}; use itertools::Itertools; use crate::parquet::Unit::RowGroup; -use crate::parquet::{ContextWithParquet, Scenario}; +use crate::parquet::{ContextWithParquet, Scenario, zero_if_metric_absent}; use datafusion_expr::{col, lit}; struct RowGroupPruningTest { scenario: Scenario, @@ -136,33 +136,51 @@ impl RowGroupPruningTest { println!("{}", output.description()); assert_eq!( - output.predicate_evaluation_errors(), + zero_if_metric_absent( + output.predicate_evaluation_errors(), + self.expected_errors + ), self.expected_errors, "mismatched predicate_evaluation error" ); assert_eq!( - output.row_groups_matched_statistics(), + zero_if_metric_absent( + output.row_groups_matched_statistics(), + self.expected_row_group_matched_by_statistics + ), self.expected_row_group_matched_by_statistics, "mismatched row_groups_matched_statistics", ); assert_eq!( - output.row_groups_pruned_statistics(), + zero_if_metric_absent( + output.row_groups_pruned_statistics(), + self.expected_row_group_pruned_by_statistics + ), self.expected_row_group_pruned_by_statistics, "mismatched row_groups_pruned_statistics", ); assert_eq!( - output.files_ranges_pruned_statistics(), + zero_if_metric_absent( + output.files_ranges_pruned_statistics(), + self.expected_files_pruned_by_statistics + ), self.expected_files_pruned_by_statistics, "mismatched files_ranges_pruned_statistics", ); let bloom_filter_metrics = output.row_groups_bloom_filter(); assert_eq!( - bloom_filter_metrics.as_ref().map(|pm| pm.total_matched()), + zero_if_metric_absent( + bloom_filter_metrics.as_ref().map(|pm| pm.total_matched()), + self.expected_row_group_matched_by_bloom_filter + ), self.expected_row_group_matched_by_bloom_filter, "mismatched row_groups_matched_bloom_filter", ); assert_eq!( - bloom_filter_metrics.map(|pm| pm.total_pruned()), + zero_if_metric_absent( + bloom_filter_metrics.map(|pm| pm.total_pruned()), + self.expected_row_group_pruned_by_bloom_filter + ), self.expected_row_group_pruned_by_bloom_filter, "mismatched row_groups_pruned_bloom_filter", ); @@ -196,32 +214,50 @@ impl RowGroupPruningTest { println!("{}", output.description()); assert_eq!( - output.predicate_evaluation_errors(), + zero_if_metric_absent( + output.predicate_evaluation_errors(), + self.expected_errors + ), self.expected_errors, "mismatched predicate_evaluation error" ); assert_eq!( - output.row_groups_matched_statistics(), + zero_if_metric_absent( + output.row_groups_matched_statistics(), + self.expected_row_group_matched_by_statistics + ), self.expected_row_group_matched_by_statistics, "mismatched row_groups_matched_statistics", ); assert_eq!( - output.row_groups_fully_matched_statistics(), + zero_if_metric_absent( + output.row_groups_fully_matched_statistics(), + self.expected_row_group_fully_matched_by_statistics + ), self.expected_row_group_fully_matched_by_statistics, "mismatched row_groups_fully_matched_statistics", ); assert_eq!( - output.row_groups_pruned_statistics(), + zero_if_metric_absent( + output.row_groups_pruned_statistics(), + self.expected_row_group_pruned_by_statistics + ), self.expected_row_group_pruned_by_statistics, "mismatched row_groups_pruned_statistics", ); assert_eq!( - output.files_ranges_pruned_statistics(), + zero_if_metric_absent( + output.files_ranges_pruned_statistics(), + self.expected_files_pruned_by_statistics + ), self.expected_files_pruned_by_statistics, "mismatched files_ranges_pruned_statistics", ); assert_eq!( - output.limit_pruned_row_groups(), + zero_if_metric_absent( + output.limit_pruned_row_groups(), + self.expected_limit_pruned_row_groups + ), self.expected_limit_pruned_row_groups, "mismatched limit_pruned_row_groups", ); @@ -343,7 +379,10 @@ async fn prune_date64() { println!("{}", output.description()); // This should prune out groups without error - assert_eq!(output.predicate_evaluation_errors(), Some(0)); + assert_eq!( + zero_if_metric_absent(output.predicate_evaluation_errors(), Some(0)), + Some(0) + ); // 'dates' table has 4 row groups, and only the first one is matched by the predicate assert_eq!(output.row_groups_matched_statistics(), Some(1)); assert_eq!(output.row_groups_pruned_statistics(), Some(3)); @@ -383,9 +422,15 @@ async fn prune_disabled() { println!("{}", output.description()); // This should not prune any - assert_eq!(output.predicate_evaluation_errors(), Some(0)); + assert_eq!( + zero_if_metric_absent(output.predicate_evaluation_errors(), Some(0)), + Some(0) + ); assert_eq!(output.row_groups_matched(), Some(4)); - assert_eq!(output.row_groups_pruned(), Some(0)); + assert_eq!( + zero_if_metric_absent(output.row_groups_pruned(), Some(0)), + Some(0) + ); assert_eq!( output.result_rows, expected_rows, diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index 262dde024a527..fb68b61c3ff2f 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use datafusion_physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory, MetricType, - PruningMetrics, RatioMergeStrategy, RatioMetrics, Time, + MetricValue, PruningMetrics, RatioMergeStrategy, RatioMetrics, Time, }; /// Stores metrics about the parquet execution for a particular parquet file. @@ -91,6 +93,9 @@ pub struct ParquetFileMetrics { /// number of rows that were stored in the cache after evaluating predicates /// reused for the output. pub predicate_cache_records: Gauge, + /// Keeps the lazy registration guard alive. The guard's [`Drop`] impl is + /// what registers the non-zero metric handles into the plan metrics set. + _registration: Arc, } impl ParquetFileMetrics { @@ -100,117 +105,68 @@ impl ParquetFileMetrics { filename: &str, metrics: &ExecutionPlanMetricsSet, ) -> Self { - // ----------------------- - // 'summary' level metrics - // ----------------------- - let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .with_type(MetricType::Summary) - .pruning_metrics("row_groups_pruned_bloom_filter", partition); - - let limit_pruned_row_groups = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .with_type(MetricType::Summary) - .pruning_metrics("limit_pruned_row_groups", partition); - - let row_groups_pruned_statistics = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .with_type(MetricType::Summary) - .pruning_metrics("row_groups_pruned_statistics", partition); - - let page_index_pages_pruned = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .with_type(MetricType::Summary) - .pruning_metrics("page_index_pages_pruned", partition); - - let bytes_scanned = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .with_type(MetricType::Summary) - .with_category(MetricCategory::Bytes) - .counter("bytes_scanned", partition); - - let metadata_load_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .with_type(MetricType::Summary) - .subset_time("metadata_load_time", partition); - - let files_ranges_pruned_statistics = MetricBuilder::new(metrics) - .with_type(MetricType::Summary) - .pruning_metrics("files_ranges_pruned_statistics", partition); - - let scan_efficiency_ratio = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .with_type(MetricType::Summary) - .ratio_metrics_with_strategy( - "scan_efficiency_ratio", - partition, - RatioMergeStrategy::AddPartSetTotal, - ); - - // ----------------------- - // 'dev' level metrics - // ----------------------- - let predicate_evaluation_errors = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .with_category(MetricCategory::Rows) - .counter("predicate_evaluation_errors", partition); - - let pushdown_rows_pruned = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .with_category(MetricCategory::Rows) - .counter("pushdown_rows_pruned", partition); - let pushdown_rows_matched = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .with_category(MetricCategory::Rows) - .counter("pushdown_rows_matched", partition); - - let row_pushdown_eval_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .subset_time("row_pushdown_eval_time", partition); - let statistics_eval_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .subset_time("statistics_eval_time", partition); - let bloom_filter_eval_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .subset_time("bloom_filter_eval_time", partition); - - let page_index_eval_time = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .subset_time("page_index_eval_time", partition); - - let page_index_rows_pruned = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .pruning_metrics("page_index_rows_pruned", partition); - - let predicate_cache_inner_records = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .with_category(MetricCategory::Rows) - .gauge("predicate_cache_inner_records", partition); - - let predicate_cache_records = MetricBuilder::new(metrics) - .with_new_label("filename", filename.to_string()) - .with_category(MetricCategory::Rows) - .gauge("predicate_cache_records", partition); + let files_ranges_pruned_statistics = PruningMetrics::new(); + let predicate_evaluation_errors = Count::new(); + let row_groups_pruned_bloom_filter = PruningMetrics::new(); + let row_groups_pruned_statistics = PruningMetrics::new(); + let limit_pruned_row_groups = PruningMetrics::new(); + let bytes_scanned = Count::new(); + let pushdown_rows_pruned = Count::new(); + let pushdown_rows_matched = Count::new(); + let row_pushdown_eval_time = Time::new(); + let page_index_rows_pruned = PruningMetrics::new(); + let page_index_pages_pruned = PruningMetrics::new(); + let statistics_eval_time = Time::new(); + let bloom_filter_eval_time = Time::new(); + let page_index_eval_time = Time::new(); + let metadata_load_time = Time::new(); + let scan_efficiency_ratio = + RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal); + let predicate_cache_inner_records = Gauge::new(); + let predicate_cache_records = Gauge::new(); Self { - files_ranges_pruned_statistics, - predicate_evaluation_errors, - row_groups_pruned_bloom_filter, - row_groups_pruned_statistics, - limit_pruned_row_groups, - bytes_scanned, - pushdown_rows_pruned, - pushdown_rows_matched, - row_pushdown_eval_time, - page_index_rows_pruned, - page_index_pages_pruned, - statistics_eval_time, - bloom_filter_eval_time, - page_index_eval_time, - metadata_load_time, - scan_efficiency_ratio, - predicate_cache_inner_records, - predicate_cache_records, + files_ranges_pruned_statistics: files_ranges_pruned_statistics.clone(), + predicate_evaluation_errors: predicate_evaluation_errors.clone(), + row_groups_pruned_bloom_filter: row_groups_pruned_bloom_filter.clone(), + row_groups_pruned_statistics: row_groups_pruned_statistics.clone(), + limit_pruned_row_groups: limit_pruned_row_groups.clone(), + bytes_scanned: bytes_scanned.clone(), + pushdown_rows_pruned: pushdown_rows_pruned.clone(), + pushdown_rows_matched: pushdown_rows_matched.clone(), + row_pushdown_eval_time: row_pushdown_eval_time.clone(), + page_index_rows_pruned: page_index_rows_pruned.clone(), + page_index_pages_pruned: page_index_pages_pruned.clone(), + statistics_eval_time: statistics_eval_time.clone(), + bloom_filter_eval_time: bloom_filter_eval_time.clone(), + page_index_eval_time: page_index_eval_time.clone(), + metadata_load_time: metadata_load_time.clone(), + scan_efficiency_ratio: scan_efficiency_ratio.clone(), + predicate_cache_inner_records: predicate_cache_inner_records.clone(), + predicate_cache_records: predicate_cache_records.clone(), + _registration: Arc::new(ParquetFileMetricRegistration { + metrics: metrics.clone(), + partition, + filename: filename.to_string(), + files_ranges_pruned_statistics, + predicate_evaluation_errors, + row_groups_pruned_bloom_filter, + limit_pruned_row_groups, + row_groups_pruned_statistics, + bytes_scanned, + pushdown_rows_pruned, + pushdown_rows_matched, + row_pushdown_eval_time, + page_index_rows_pruned, + page_index_pages_pruned, + statistics_eval_time, + bloom_filter_eval_time, + page_index_eval_time, + metadata_load_time, + scan_efficiency_ratio, + predicate_cache_inner_records, + predicate_cache_records, + }), } } @@ -238,3 +194,365 @@ impl ParquetFileMetrics { count.add(n); } } + +#[derive(Debug)] +struct ParquetFileMetricRegistration { + metrics: ExecutionPlanMetricsSet, + partition: usize, + filename: String, + files_ranges_pruned_statistics: PruningMetrics, + predicate_evaluation_errors: Count, + row_groups_pruned_bloom_filter: PruningMetrics, + limit_pruned_row_groups: PruningMetrics, + row_groups_pruned_statistics: PruningMetrics, + bytes_scanned: Count, + pushdown_rows_pruned: Count, + pushdown_rows_matched: Count, + row_pushdown_eval_time: Time, + page_index_rows_pruned: PruningMetrics, + page_index_pages_pruned: PruningMetrics, + statistics_eval_time: Time, + bloom_filter_eval_time: Time, + page_index_eval_time: Time, + metadata_load_time: Time, + scan_efficiency_ratio: RatioMetrics, + predicate_cache_inner_records: Gauge, + predicate_cache_records: Gauge, +} + +impl Drop for ParquetFileMetricRegistration { + fn drop(&mut self) { + self.register_pruning( + "files_ranges_pruned_statistics", + &self.files_ranges_pruned_statistics, + MetricType::Summary, + false, + ); + self.register_count( + "predicate_evaluation_errors", + &self.predicate_evaluation_errors, + MetricType::Dev, + MetricCategory::Rows, + true, + ); + self.register_pruning( + "row_groups_pruned_bloom_filter", + &self.row_groups_pruned_bloom_filter, + MetricType::Summary, + true, + ); + self.register_pruning( + "limit_pruned_row_groups", + &self.limit_pruned_row_groups, + MetricType::Summary, + true, + ); + self.register_pruning( + "row_groups_pruned_statistics", + &self.row_groups_pruned_statistics, + MetricType::Summary, + true, + ); + self.register_count( + "bytes_scanned", + &self.bytes_scanned, + MetricType::Summary, + MetricCategory::Bytes, + true, + ); + self.register_count( + "pushdown_rows_pruned", + &self.pushdown_rows_pruned, + MetricType::Dev, + MetricCategory::Rows, + true, + ); + self.register_count( + "pushdown_rows_matched", + &self.pushdown_rows_matched, + MetricType::Dev, + MetricCategory::Rows, + true, + ); + self.register_time( + "row_pushdown_eval_time", + &self.row_pushdown_eval_time, + MetricType::Dev, + true, + ); + self.register_pruning( + "page_index_rows_pruned", + &self.page_index_rows_pruned, + MetricType::Dev, + true, + ); + self.register_pruning( + "page_index_pages_pruned", + &self.page_index_pages_pruned, + MetricType::Summary, + true, + ); + self.register_time( + "statistics_eval_time", + &self.statistics_eval_time, + MetricType::Dev, + true, + ); + self.register_time( + "bloom_filter_eval_time", + &self.bloom_filter_eval_time, + MetricType::Dev, + true, + ); + self.register_time( + "page_index_eval_time", + &self.page_index_eval_time, + MetricType::Dev, + true, + ); + self.register_time( + "metadata_load_time", + &self.metadata_load_time, + MetricType::Summary, + true, + ); + self.register_ratio( + "scan_efficiency_ratio", + &self.scan_efficiency_ratio, + MetricType::Summary, + true, + ); + self.register_gauge( + "predicate_cache_inner_records", + &self.predicate_cache_inner_records, + MetricType::Dev, + MetricCategory::Rows, + true, + ); + self.register_gauge( + "predicate_cache_records", + &self.predicate_cache_records, + MetricType::Dev, + MetricCategory::Rows, + true, + ); + } +} + +impl ParquetFileMetricRegistration { + fn builder( + &self, + metric_type: MetricType, + metric_category: MetricCategory, + filename_label: bool, + ) -> MetricBuilder<'_> { + let builder = MetricBuilder::new(&self.metrics) + .with_type(metric_type) + .with_category(metric_category); + if filename_label { + builder.with_new_label("filename", self.filename.clone()) + } else { + builder + } + } + + fn register_count( + &self, + name: &'static str, + count: &Count, + metric_type: MetricType, + metric_category: MetricCategory, + filename_label: bool, + ) { + if count.value() == 0 { + return; + } + + self.builder(metric_type, metric_category, filename_label) + .with_partition(self.partition) + .build(MetricValue::Count { + name: name.into(), + count: count.clone(), + }); + } + + fn register_gauge( + &self, + name: &'static str, + gauge: &Gauge, + metric_type: MetricType, + metric_category: MetricCategory, + filename_label: bool, + ) { + if gauge.value() == 0 { + return; + } + + self.builder(metric_type, metric_category, filename_label) + .with_partition(self.partition) + .build(MetricValue::Gauge { + name: name.into(), + gauge: gauge.clone(), + }); + } + + fn register_time( + &self, + name: &'static str, + time: &Time, + metric_type: MetricType, + filename_label: bool, + ) { + if time.value() == 0 { + return; + } + + self.builder(metric_type, MetricCategory::Timing, filename_label) + .with_partition(self.partition) + .build(MetricValue::Time { + name: name.into(), + time: time.clone(), + }); + } + + fn register_pruning( + &self, + name: &'static str, + pruning_metrics: &PruningMetrics, + metric_type: MetricType, + filename_label: bool, + ) { + if pruning_metrics.pruned() == 0 + && pruning_metrics.matched() == 0 + && pruning_metrics.fully_matched() == 0 + { + return; + } + + self.builder(metric_type, MetricCategory::Rows, filename_label) + .with_partition(self.partition) + .build(MetricValue::PruningMetrics { + name: name.into(), + pruning_metrics: pruning_metrics.clone(), + }); + } + + fn register_ratio( + &self, + name: &'static str, + ratio_metrics: &RatioMetrics, + metric_type: MetricType, + filename_label: bool, + ) { + if ratio_metrics.part() == 0 && ratio_metrics.total() == 0 { + return; + } + + self.builder(metric_type, MetricCategory::Rows, filename_label) + .with_partition(self.partition) + .build(MetricValue::Ratio { + name: name.into(), + ratio_metrics: ratio_metrics.clone(), + }); + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + #[test] + fn parquet_file_metrics_skip_zero_metrics_on_drop() { + let metrics = ExecutionPlanMetricsSet::new(); + + { + let _file_metrics = ParquetFileMetrics::new(0, "file.parquet", &metrics); + } + + assert!( + metrics.clone_inner().iter().next().is_none(), + "zero-value parquet file metrics should not be registered" + ); + } + + #[test] + fn parquet_file_metrics_register_non_zero_metrics_on_drop() { + let metrics = ExecutionPlanMetricsSet::new(); + + { + let file_metrics = ParquetFileMetrics::new(0, "file.parquet", &metrics); + assert!(metrics.clone_inner().sum_by_name("bytes_scanned").is_none()); + + file_metrics.bytes_scanned.add(42); + file_metrics.predicate_cache_records.set(9); + file_metrics + .metadata_load_time + .add_duration(Duration::from_nanos(7)); + file_metrics.row_groups_pruned_statistics.add_pruned(2); + file_metrics.row_groups_pruned_statistics.add_matched(5); + file_metrics.page_index_rows_pruned.add_fully_matched(3); + file_metrics.scan_efficiency_ratio.add_total(100); + } + + let metrics = metrics.clone_inner(); + assert_eq!( + metrics.sum_by_name("bytes_scanned").map(|v| v.as_usize()), + Some(42) + ); + assert_eq!( + metrics + .sum_by_name("predicate_cache_records") + .map(|v| v.as_usize()), + Some(9) + ); + assert_eq!( + metrics + .sum_by_name("metadata_load_time") + .map(|v| v.as_usize()), + Some(7) + ); + + match metrics + .sum_by_name("row_groups_pruned_statistics") + .expect("row_groups_pruned_statistics registered") + { + MetricValue::PruningMetrics { + pruning_metrics, .. + } => { + assert_eq!(pruning_metrics.pruned(), 2); + assert_eq!(pruning_metrics.matched(), 5); + assert_eq!(pruning_metrics.fully_matched(), 0); + } + metric => panic!("unexpected metric type: {metric:?}"), + } + + match metrics + .sum_by_name("page_index_rows_pruned") + .expect("page_index_rows_pruned registered") + { + MetricValue::PruningMetrics { + pruning_metrics, .. + } => { + assert_eq!(pruning_metrics.pruned(), 0); + assert_eq!(pruning_metrics.matched(), 0); + assert_eq!(pruning_metrics.fully_matched(), 3); + } + metric => panic!("unexpected metric type: {metric:?}"), + } + + match metrics + .sum_by_name("scan_efficiency_ratio") + .expect("scan_efficiency_ratio registered") + { + MetricValue::Ratio { ratio_metrics, .. } => { + assert_eq!(ratio_metrics.part(), 0); + assert_eq!(ratio_metrics.total(), 100); + } + metric => panic!("unexpected metric type: {metric:?}"), + } + + assert!(metrics.sum_by_name("predicate_evaluation_errors").is_none()); + } +} diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index e5929fd43f11a..2dd94f2eca77c 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -1233,6 +1233,7 @@ impl RowGroupsPrunedParquetOpen { let output_schema = Arc::clone(&prepared.output_schema); let files_ranges_pruned_statistics = prepared.file_metrics.files_ranges_pruned_statistics.clone(); + let file_metrics = prepared.file_metrics.clone(); let stream = PushDecoderStreamState { decoder, pending_decoders, @@ -1245,6 +1246,7 @@ impl RowGroupsPrunedParquetOpen { predicate_cache_inner_records, predicate_cache_records, baseline_metrics: prepared.baseline_metrics, + _file_metrics: file_metrics, } .into_stream(); @@ -1261,7 +1263,6 @@ impl RowGroupsPrunedParquetOpen { } } } - type ConstantColumns = HashMap; /// Extract constant column values from statistics, keyed by column name in the logical file schema. diff --git a/datafusion/datasource-parquet/src/push_decoder.rs b/datafusion/datasource-parquet/src/push_decoder.rs index 8b71be3e8de96..9f111b76a4dd4 100644 --- a/datafusion/datasource-parquet/src/push_decoder.rs +++ b/datafusion/datasource-parquet/src/push_decoder.rs @@ -48,6 +48,7 @@ use datafusion_common::{DataFusionError, Result}; use datafusion_physical_expr::projection::Projector; use datafusion_physical_plan::metrics::{BaselineMetrics, Gauge}; +use crate::ParquetFileMetrics; use crate::access_plan::PreparedAccessPlan; use crate::row_filter::ParquetReadPlan; @@ -120,6 +121,11 @@ pub(crate) struct PushDecoderStreamState { pub(crate) predicate_cache_inner_records: Gauge, pub(crate) predicate_cache_records: Gauge, pub(crate) baseline_metrics: BaselineMetrics, + /// Keeps the parquet file metrics registration guard alive until this + /// stream finishes. The decoder and row filters update cloned metric + /// handles while streaming; dropping this early would register metrics + /// before those updates are recorded. + pub(crate) _file_metrics: ParquetFileMetrics, } impl PushDecoderStreamState { diff --git a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt index e779ce2cbffb0..08a44bed61995 100644 --- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt +++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt @@ -104,7 +104,7 @@ Plan with Metrics 03)----ProjectionExec: expr=[id@0 as id, value@1 as v, value@1 + id@0 as name], metrics=[output_rows=10, ] 04)------FilterExec: value@1 > 3, metrics=[output_rows=10, , selectivity=100% (10/10)] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, metrics=[output_rows=10, ] -06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, elapsed_compute=, output_bytes=80.0 B, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_pages_skipped_by_fully_matched=1, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=210, metadata_load_time=, scan_efficiency_ratio=18.31% (210/1.15 K)] +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, elapsed_compute=, output_bytes=80.0 B, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_skipped_by_fully_matched=1, bytes_scanned=210, metadata_load_time=, scan_efficiency_ratio=18.31% (210/1.15 K)] statement ok set datafusion.explain.analyze_level = dev; diff --git a/datafusion/sqllogictest/test_files/limit_pruning.slt b/datafusion/sqllogictest/test_files/limit_pruning.slt index 373e1636a2bb6..7a7ffcaa5b309 100644 --- a/datafusion/sqllogictest/test_files/limit_pruning.slt +++ b/datafusion/sqllogictest/test_files/limit_pruning.slt @@ -63,7 +63,7 @@ set datafusion.explain.analyze_level = summary; query TT explain analyze select * from tracking_data where species > 'M' AND s >= 50 limit 3; ---- -Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=0 total → 0 matched, page_index_pages_skipped_by_fully_matched=1, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (171/2.35 K)] +Plan with Metrics DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_skipped_by_fully_matched=1, limit_pruned_row_groups=2 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (171/2.35 K)] statement ok CREATE TABLE fully_matched_limit_source AS VALUES @@ -120,7 +120,7 @@ explain analyze select * from tracking_data where species > 'M' AND s >= 50 orde ---- Plan with Metrics 01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], metrics=[output_rows=3, elapsed_compute=, output_bytes=] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], sort_order_for_reorder=[species@0 ASC NULLS LAST], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, page_index_pages_skipped_by_fully_matched=1, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (521/2.35 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]}, projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 50 AND DynamicFilter [ species@0 < Nlpine Sheep ], sort_order_for_reorder=[species@0 ASC NULLS LAST], pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=, output_bytes=, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=2 total → 2 matched, page_index_pages_skipped_by_fully_matched=1, bytes_scanned=, metadata_load_time=, scan_efficiency_ratio= (521/2.35 K)] statement ok drop table tracking_data; diff --git a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt index b04b962a5df19..c3304f4cdf0b8 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt @@ -206,7 +206,7 @@ EXPLAIN ANALYZE SELECT t FROM topk_pushdown ORDER BY t * t LIMIT 10; ---- Plan with Metrics 01)SortExec: TopK(fetch=10), expr=[t@0 * t@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[t@0 * t@0 < 1884329474306198481], metrics=[output_rows=10, output_batches=1, row_replacements=10] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_pushdown.parquet]]}, projection=[t], output_ordering=[t@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ t@0 * t@0 < 1884329474306198481 ], metrics=[output_rows=128, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=782 total → 782 matched, row_groups_pruned_bloom_filter=782 total → 782 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=128, pushdown_rows_pruned=99.87 K, predicate_cache_inner_records=128, predicate_cache_records=128, scan_efficiency_ratio=64.87% (258.7 K/398.8 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_pushdown.parquet]]}, projection=[t], output_ordering=[t@0 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ t@0 * t@0 < 1884329474306198481 ], metrics=[output_rows=128, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=782 total → 782 matched, row_groups_pruned_bloom_filter=782 total → 782 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=128, pushdown_rows_pruned=99.87 K, predicate_cache_inner_records=128, predicate_cache_records=128, scan_efficiency_ratio=64.87% (258.7 K/398.8 K)] statement ok reset datafusion.explain.analyze_categories; @@ -268,7 +268,7 @@ EXPLAIN ANALYZE SELECT * FROM topk_single_col ORDER BY b DESC LIMIT 1; ---- Plan with Metrics 01)SortExec: TopK(fetch=1), expr=[b@1 DESC], preserve_partitioning=[false], filter=[b@1 IS NULL OR b@1 > bd], metrics=[output_rows=1, output_batches=1, row_replacements=1] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_single_col.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ b@1 IS NULL OR b@1 > bd ], sort_order_for_reorder=[b@1 DESC], reverse_row_groups=true, pruning_predicate=b_null_count@0 > 0 OR b_null_count@0 != row_count@2 AND b_max@1 > bd, required_guarantees=[], metrics=[output_rows=4, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=4, pushdown_rows_pruned=0, predicate_cache_inner_records=4, predicate_cache_records=4, scan_efficiency_ratio=22.37% (240/1.07 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_single_col.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ b@1 IS NULL OR b@1 > bd ], sort_order_for_reorder=[b@1 DESC], reverse_row_groups=true, pruning_predicate=b_null_count@0 > 0 OR b_null_count@0 != row_count@2 AND b_max@1 > bd, required_guarantees=[], metrics=[output_rows=4, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=4, predicate_cache_inner_records=4, predicate_cache_records=4, scan_efficiency_ratio=22.37% (240/1.07 K)] statement ok reset datafusion.explain.analyze_categories; @@ -319,7 +319,7 @@ EXPLAIN ANALYZE SELECT * FROM topk_multi_col ORDER BY b ASC NULLS LAST, a DESC L ---- Plan with Metrics 01)SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], preserve_partitioning=[false], filter=[b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR a@0 > ac)], metrics=[output_rows=2, output_batches=1, row_replacements=2] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_multi_col.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR a@0 > ac) ], sort_order_for_reorder=[b@1 ASC NULLS LAST, a@0 DESC], pruning_predicate=b_null_count@1 != row_count@2 AND b_min@0 < bb OR b_null_count@1 != row_count@2 AND b_min@0 <= bb AND bb <= b_max@3 AND (a_null_count@4 > 0 OR a_null_count@4 != row_count@2 AND a_max@5 > ac), required_guarantees=[], metrics=[output_rows=4, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=4, pushdown_rows_pruned=0, predicate_cache_inner_records=8, predicate_cache_records=8, scan_efficiency_ratio=22.37% (240/1.07 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_multi_col.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR a@0 > ac) ], sort_order_for_reorder=[b@1 ASC NULLS LAST, a@0 DESC], pruning_predicate=b_null_count@1 != row_count@2 AND b_min@0 < bb OR b_null_count@1 != row_count@2 AND b_min@0 <= bb AND bb <= b_max@3 AND (a_null_count@4 > 0 OR a_null_count@4 != row_count@2 AND a_max@5 > ac), required_guarantees=[], metrics=[output_rows=4, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=4, predicate_cache_inner_records=8, predicate_cache_records=8, scan_efficiency_ratio=22.37% (240/1.07 K)] statement ok reset datafusion.explain.analyze_categories; @@ -388,8 +388,8 @@ FROM join_probe p INNER JOIN join_build AS build ---- Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], projection=[a@3, b@4, c@2, e@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, scan_efficiency_ratio=20.48% (214/1.04 K)] +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] statement ok reset datafusion.explain.analyze_categories; @@ -474,9 +474,9 @@ INNER JOIN nested_t3 ON nested_t2.c = nested_t3.d; Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@3, d@0)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, b@0)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t1.parquet]]}, projection=[a, x], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=18.23% (144/790)] -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t2.parquet]]}, projection=[b, c, y], file_type=parquet, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ], pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 >= aa AND b_null_count@1 != row_count@2 AND b_min@3 <= ab AND (b_null_count@1 != row_count@2 AND b_min@3 <= aa AND aa <= b_max@0 OR b_null_count@1 != row_count@2 AND b_min@3 <= ab AND ab <= b_max@0), required_guarantees=[b in (aa, ab)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=5 total → 5 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=3, predicate_cache_inner_records=5, predicate_cache_records=2, scan_efficiency_ratio=23.2% (252/1.09 K)] -05)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t3.parquet]]}, projection=[d, z], file_type=parquet, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb AND hash_lookup ], pruning_predicate=d_null_count@1 != row_count@2 AND d_max@0 >= ca AND d_null_count@1 != row_count@2 AND d_min@3 <= cb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=8 total → 8 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=6, predicate_cache_inner_records=8, predicate_cache_records=2, scan_efficiency_ratio=22.12% (184/832)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t1.parquet]]}, projection=[a, x], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, scan_efficiency_ratio=18.23% (144/790)] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t2.parquet]]}, projection=[b, c, y], file_type=parquet, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ], pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 >= aa AND b_null_count@1 != row_count@2 AND b_min@3 <= ab AND (b_null_count@1 != row_count@2 AND b_min@3 <= aa AND aa <= b_max@0 OR b_null_count@1 != row_count@2 AND b_min@3 <= ab AND ab <= b_max@0), required_guarantees=[b in (aa, ab)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=5 total → 5 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=3, predicate_cache_inner_records=5, predicate_cache_records=2, scan_efficiency_ratio=23.2% (252/1.09 K)] +05)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nested_t3.parquet]]}, projection=[d, z], file_type=parquet, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb AND hash_lookup ], pruning_predicate=d_null_count@1 != row_count@2 AND d_max@0 >= ca AND d_null_count@1 != row_count@2 AND d_min@3 <= cb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=8 total → 8 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=6, predicate_cache_inner_records=8, predicate_cache_records=2, scan_efficiency_ratio=22.12% (184/832)] statement ok reset datafusion.explain.analyze_categories; @@ -605,8 +605,8 @@ LIMIT 2; Plan with Metrics 01)SortExec: TopK(fetch=2), expr=[e@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[e@0 < bb], metrics=[output_rows=2, output_batches=1, row_replacements=2] 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, d@0)], projection=[e@2], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_join_build.parquet]]}, projection=[a], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=6.7% (70/1.04 K)] -04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_join_probe.parquet]]}, projection=[d, e], file_type=parquet, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ] AND DynamicFilter [ e@1 < bb ], pruning_predicate=d_null_count@1 != row_count@2 AND d_max@0 >= aa AND d_null_count@1 != row_count@2 AND d_min@3 <= ab AND (d_null_count@1 != row_count@2 AND d_min@3 <= aa AND aa <= d_max@0 OR d_null_count@1 != row_count@2 AND d_min@3 <= ab AND ab <= d_max@0) AND e_null_count@5 != row_count@2 AND e_min@4 < bb, required_guarantees=[d in (aa, ab)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=4 total → 4 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=15.37% (166/1.08 K)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_join_build.parquet]]}, projection=[a], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, scan_efficiency_ratio=6.7% (70/1.04 K)] +04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_join_probe.parquet]]}, projection=[d, e], file_type=parquet, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ] AND DynamicFilter [ e@1 < bb ], pruning_predicate=d_null_count@1 != row_count@2 AND d_max@0 >= aa AND d_null_count@1 != row_count@2 AND d_min@3 <= ab AND (d_null_count@1 != row_count@2 AND d_min@3 <= aa AND aa <= d_max@0 OR d_null_count@1 != row_count@2 AND d_min@3 <= ab AND ab <= d_max@0) AND e_null_count@5 != row_count@2 AND e_min@4 < bb, required_guarantees=[d in (aa, ab)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=4 total → 4 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=15.37% (166/1.08 K)] statement ok reset datafusion.explain.analyze_categories; @@ -655,7 +655,7 @@ EXPLAIN ANALYZE SELECT b, a FROM topk_proj ORDER BY a LIMIT 2; ---- Plan with Metrics 01)SortExec: TopK(fetch=2), expr=[a@1 ASC NULLS LAST], preserve_partitioning=[false], filter=[a@1 < 2], metrics=[output_rows=2, output_batches=1, row_replacements=2] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[b, a], file_type=parquet, predicate=DynamicFilter [ a@0 < 2 ], sort_order_for_reorder=[a@1 ASC NULLS LAST], pruning_predicate=a_null_count@1 != row_count@2 AND a_min@0 < 2, required_guarantees=[], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=3, pushdown_rows_pruned=0, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=13.72% (153/1.11 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[b, a], file_type=parquet, predicate=DynamicFilter [ a@0 < 2 ], sort_order_for_reorder=[a@1 ASC NULLS LAST], pruning_predicate=a_null_count@1 != row_count@2 AND a_min@0 < 2, required_guarantees=[], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=3, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=13.72% (153/1.11 K)] # Case 2: prune — `SELECT a` — filter stays as `a < 2` on the scan. query TT @@ -663,7 +663,7 @@ EXPLAIN ANALYZE SELECT a FROM topk_proj ORDER BY a LIMIT 2; ---- Plan with Metrics 01)SortExec: TopK(fetch=2), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[a@0 < 2], metrics=[output_rows=2, output_batches=1, row_replacements=2] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[a], file_type=parquet, predicate=DynamicFilter [ a@0 < 2 ], sort_order_for_reorder=[a@0 ASC NULLS LAST], pruning_predicate=a_null_count@1 != row_count@2 AND a_min@0 < 2, required_guarantees=[], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=3, pushdown_rows_pruned=0, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=7.09% (79/1.11 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[a], file_type=parquet, predicate=DynamicFilter [ a@0 < 2 ], sort_order_for_reorder=[a@0 ASC NULLS LAST], pruning_predicate=a_null_count@1 != row_count@2 AND a_min@0 < 2, required_guarantees=[], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=3, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=7.09% (79/1.11 K)] # Case 3: expression — `SELECT a+1 AS a_plus_1` — the TopK filter is on # `a_plus_1`, the scan predicate must read `a@0 + 1`. @@ -672,7 +672,7 @@ EXPLAIN ANALYZE SELECT a + 1 AS a_plus_1, b FROM topk_proj ORDER BY a_plus_1 LIM ---- Plan with Metrics 01)SortExec: TopK(fetch=2), expr=[a_plus_1@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[a_plus_1@0 < 3], metrics=[output_rows=2, output_batches=1, row_replacements=2] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[CAST(a@0 AS Int64) + 1 as a_plus_1, b], file_type=parquet, predicate=DynamicFilter [ CAST(a@0 AS Int64) + 1 < 3 ], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=3, pushdown_rows_pruned=0, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=13.72% (153/1.11 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[CAST(a@0 AS Int64) + 1 as a_plus_1, b], file_type=parquet, predicate=DynamicFilter [ CAST(a@0 AS Int64) + 1 < 3 ], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=3, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=13.72% (153/1.11 K)] # Case 4: alias shadowing — `SELECT a+1 AS a` — the projection renames # `a+1` to `a`, so the TopK's `a < 3` must still be rewritten to @@ -682,7 +682,7 @@ EXPLAIN ANALYZE SELECT a + 1 AS a, b FROM topk_proj ORDER BY a LIMIT 2; ---- Plan with Metrics 01)SortExec: TopK(fetch=2), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false], filter=[a@0 < 3], metrics=[output_rows=2, output_batches=1, row_replacements=2] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[CAST(a@0 AS Int64) + 1 as a, b], file_type=parquet, predicate=DynamicFilter [ CAST(a@0 AS Int64) + 1 < 3 ], sort_order_for_reorder=[a@0 ASC NULLS LAST], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=3, pushdown_rows_pruned=0, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=13.72% (153/1.11 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_proj.parquet]]}, projection=[CAST(a@0 AS Int64) + 1 as a, b], file_type=parquet, predicate=DynamicFilter [ CAST(a@0 AS Int64) + 1 < 3 ], sort_order_for_reorder=[a@0 ASC NULLS LAST], metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=3, predicate_cache_inner_records=3, predicate_cache_records=3, scan_efficiency_ratio=13.72% (153/1.11 K)] statement ok reset datafusion.explain.analyze_categories; @@ -739,12 +739,12 @@ INNER JOIN ( ---- Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)], projection=[a@0, min_value@2], metrics=[output_rows=2, output_batches=2, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_agg_build.parquet]]}, projection=[a], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=15.32% (70/457)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_agg_build.parquet]]}, projection=[a], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, scan_efficiency_ratio=15.32% (70/457)] 03)--ProjectionExec: expr=[a@0 as a, min(join_agg_probe.value)@1 as min_value], metrics=[output_rows=2, output_batches=2] 04)----AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[min(join_agg_probe.value)], metrics=[output_rows=2, output_batches=2, spill_count=0, spilled_rows=0] 05)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1, metrics=[output_rows=2, output_batches=2, spill_count=0, spilled_rows=0] 06)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[min(join_agg_probe.value)], metrics=[output_rows=2, output_batches=1, spill_count=0, spilled_rows=0, skipped_aggregation_rows=0, reduction_factor=100% (2/2)] -07)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_agg_probe.parquet]]}, projection=[a, value], file_type=parquet, predicate=DynamicFilter [ a@0 >= h1 AND a@0 <= h2 AND a@0 IN (SET) ([h1, h2]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= h1 AND a_null_count@1 != row_count@2 AND a_min@3 <= h2 AND (a_null_count@1 != row_count@2 AND a_min@3 <= h1 AND h1 <= a_max@0 OR a_null_count@1 != row_count@2 AND a_min@3 <= h2 AND h2 <= a_max@0), required_guarantees=[a in (h1, h2)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=4 total → 4 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=4, predicate_cache_records=2, scan_efficiency_ratio=19.81% (163/823)] +07)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/join_agg_probe.parquet]]}, projection=[a, value], file_type=parquet, predicate=DynamicFilter [ a@0 >= h1 AND a@0 <= h2 AND a@0 IN (SET) ([h1, h2]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= h1 AND a_null_count@1 != row_count@2 AND a_min@3 <= h2 AND (a_null_count@1 != row_count@2 AND a_min@3 <= h1 AND h1 <= a_max@0 OR a_null_count@1 != row_count@2 AND a_min@3 <= h2 AND h2 <= a_max@0), required_guarantees=[a in (h1, h2)], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, page_index_rows_pruned=4 total → 4 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=4, predicate_cache_records=2, scan_efficiency_ratio=19.81% (163/823)] statement ok reset datafusion.explain.analyze_categories; @@ -806,8 +806,8 @@ ON nulls_build.a = nulls_probe.a AND nulls_build.b = nulls_probe.b; ---- Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=1, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=3, input_batches=1, input_rows=1, avg_fanout=100% (1/1), probe_hit_rate=100% (1/1)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nulls_build.parquet]]}, projection=[a, b], file_type=parquet, metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=18.6% (144/774)] -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nulls_probe.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:1}, {c0:,c1:2}, {c0:ab,c1:}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= 1 AND b_null_count@5 != row_count@2 AND b_min@6 <= 2, required_guarantees=[], metrics=[output_rows=1, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=1, pushdown_rows_pruned=3, predicate_cache_inner_records=8, predicate_cache_records=2, scan_efficiency_ratio=21.1% (237/1.12 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nulls_build.parquet]]}, projection=[a, b], file_type=parquet, metrics=[output_rows=3, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, scan_efficiency_ratio=18.6% (144/774)] +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/nulls_probe.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:1}, {c0:,c1:2}, {c0:ab,c1:}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= 1 AND b_null_count@5 != row_count@2 AND b_min@6 <= 2, required_guarantees=[], metrics=[output_rows=1, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=1, pushdown_rows_pruned=3, predicate_cache_inner_records=8, predicate_cache_records=2, scan_efficiency_ratio=21.1% (237/1.12 K)] statement ok reset datafusion.explain.analyze_categories; @@ -872,8 +872,8 @@ ON lj_build.a = lj_probe.a AND lj_build.b = lj_probe.b; ---- Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, scan_efficiency_ratio=20.48% (214/1.04 K)] +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] # LEFT SEMI JOIN: only matching build rows are returned; probe scan still # receives the dynamic filter. @@ -888,8 +888,8 @@ WHERE EXISTS ( ---- Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), (b@1, b@1)], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=2, input_rows=4, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_probe.parquet]]}, projection=[a, b], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=15.37% (166/1.08 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, scan_efficiency_ratio=20.48% (214/1.04 K)] +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/lj_probe.parquet]]}, projection=[a, b], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=15.37% (166/1.08 K)] statement ok reset datafusion.explain.analyze_categories; @@ -958,8 +958,8 @@ FROM hl_probe p INNER JOIN hl_build AS build ---- Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], projection=[a@3, b@4, c@2, e@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/hl_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=20.48% (214/1.04 K)] -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/hl_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND hash_lookup ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/hl_build.parquet]]}, projection=[a, b, c], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, scan_efficiency_ratio=20.48% (214/1.04 K)] +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/hl_probe.parquet]]}, projection=[a, b, e], file_type=parquet, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND hash_lookup ], pruning_predicate=a_null_count@1 != row_count@2 AND a_max@0 >= aa AND a_null_count@1 != row_count@2 AND a_min@3 <= ab AND b_null_count@5 != row_count@2 AND b_max@4 >= ba AND b_null_count@5 != row_count@2 AND b_min@6 <= bb, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=22.78% (246/1.08 K)] statement ok drop table hl_build; @@ -1007,8 +1007,8 @@ FROM int_build b INNER JOIN int_probe p ---- Plan with Metrics 01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id1@0, id1@0), (id2@1, id2@1)], projection=[id1@0, id2@1, value@2, data@5], metrics=[output_rows=2, output_batches=1, array_map_created_count=0, build_input_batches=1, build_input_rows=2, input_batches=1, input_rows=2, avg_fanout=100% (2/2), probe_hit_rate=100% (2/2)] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/int_build.parquet]]}, projection=[id1, id2, value], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=19.02% (222/1.17 K)] -03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/int_probe.parquet]]}, projection=[id1, id2, data], file_type=parquet, predicate=DynamicFilter [ id1@0 >= 1 AND id1@0 <= 2 AND id2@1 >= 10 AND id2@1 <= 20 AND hash_lookup ], pruning_predicate=id1_null_count@1 != row_count@2 AND id1_max@0 >= 1 AND id1_null_count@1 != row_count@2 AND id1_min@3 <= 2 AND id2_null_count@5 != row_count@2 AND id2_max@4 >= 10 AND id2_null_count@5 != row_count@2 AND id2_min@6 <= 20, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=21.43% (239/1.11 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/int_build.parquet]]}, projection=[id1, id2, value], file_type=parquet, metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, scan_efficiency_ratio=19.02% (222/1.17 K)] +03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/int_probe.parquet]]}, projection=[id1, id2, data], file_type=parquet, predicate=DynamicFilter [ id1@0 >= 1 AND id1@0 <= 2 AND id2@1 >= 10 AND id2@1 <= 20 AND hash_lookup ], pruning_predicate=id1_null_count@1 != row_count@2 AND id1_max@0 >= 1 AND id1_null_count@1 != row_count@2 AND id1_min@3 <= 2 AND id2_null_count@5 != row_count@2 AND id2_max@4 >= 10 AND id2_null_count@5 != row_count@2 AND id2_min@6 <= 20, required_guarantees=[], metrics=[output_rows=2, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, pushdown_rows_matched=2, pushdown_rows_pruned=2, predicate_cache_inner_records=8, predicate_cache_records=4, scan_efficiency_ratio=21.43% (239/1.11 K)] statement ok reset datafusion.explain.analyze_categories;