Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 42 additions & 9 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -892,8 +892,11 @@ mod tests {
.await;
// There should be no predicate evaluation errors
let metrics = rt.parquet_exec.metrics().unwrap();
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
assert_eq!(
get_value_or_zero(&metrics, "predicate_evaluation_errors"),
0
);
assert_eq!(get_value_or_zero(&metrics, "pushdown_rows_matched"), 0);
assert_eq!(rt.batches.unwrap().len(), 0);

// Predicate should prune no row groups
Expand All @@ -905,8 +908,11 @@ mod tests {
.await;
// There should be no predicate evaluation errors
let metrics = rt.parquet_exec.metrics().unwrap();
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
assert_eq!(
get_value_or_zero(&metrics, "predicate_evaluation_errors"),
0
);
assert_eq!(get_value_or_zero(&metrics, "pushdown_rows_matched"), 0);
let read = rt
.batches
.unwrap()
Expand Down Expand Up @@ -934,7 +940,10 @@ mod tests {
.await;
// There should be no predicate evaluation errors
let metrics = rt.parquet_exec.metrics().unwrap();
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
assert_eq!(
get_value_or_zero(&metrics, "predicate_evaluation_errors"),
0
);
assert_eq!(rt.batches.unwrap().len(), 0);

// Predicate should prune no row groups
Expand All @@ -946,7 +955,10 @@ mod tests {
.await;
// There should be no predicate evaluation errors
let metrics = rt.parquet_exec.metrics().unwrap();
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
assert_eq!(
get_value_or_zero(&metrics, "predicate_evaluation_errors"),
0
);
let read = rt
.batches
.unwrap()
Expand Down Expand Up @@ -986,15 +998,21 @@ mod tests {
.await;
// There should be no predicate evaluation errors and we keep 1 row
let metrics = rt.parquet_exec.metrics().unwrap();
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
assert_eq!(
get_value_or_zero(&metrics, "predicate_evaluation_errors"),
0
);
let read = rt
.batches
.unwrap()
.iter()
.map(|b| b.num_rows())
.sum::<usize>();
assert_eq!(read, 1, "Expected 1 rows to match the predicate");
assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
assert_eq!(
get_value_or_zero(&metrics, "row_groups_pruned_statistics"),
0
);
assert_eq!(get_value(&metrics, "page_index_rows_pruned"), 2);
assert_eq!(get_value(&metrics, "page_index_pages_pruned"), 1);
assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 1);
Expand All @@ -1012,7 +1030,10 @@ mod tests {
.await;
// There should be no predicate evaluation errors and we keep 0 rows
let metrics = rt.parquet_exec.metrics().unwrap();
assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
assert_eq!(
get_value_or_zero(&metrics, "predicate_evaluation_errors"),
0
);
let read = rt
.batches
.unwrap()
Expand Down Expand Up @@ -2147,6 +2168,18 @@ mod tests {
}
}

fn get_value_or_zero(metrics: &MetricsSet, metric_name: &str) -> usize {
metrics
.sum_by_name(metric_name)
.map(|v| match v {
MetricValue::PruningMetrics {
pruning_metrics, ..
} => pruning_metrics.pruned(),
_ => v.as_usize(),
})
.unwrap_or(0)
}

fn get_pruning_metric(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
match metrics.sum_by_name(metric_name) {
Some(MetricValue::PruningMetrics {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/parquet/external_access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ async fn skip_all() {
.await;

// Verify that skipping all row groups skips reading any data at all
let bytes_scanned = metric_value(&parquet_metrics, "bytes_scanned").unwrap();
let bytes_scanned = metric_value(&parquet_metrics, "bytes_scanned").unwrap_or(0);
assert_eq!(bytes_scanned, 0, "metrics : {parquet_metrics:#?}",);
}

Expand Down
79 changes: 62 additions & 17 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,17 @@ impl<'a> TestCase<'a> {
PushdownExpected::None
};

let pushdown_rows_pruned = get_value(&metrics, "pushdown_rows_pruned");
let pushdown_rows_pruned = match pushdown_expected {
PushdownExpected::None => {
metric_value_or_zero(&metrics, "pushdown_rows_pruned")
}
PushdownExpected::Some => {
expect_metric_value(&metrics, "pushdown_rows_pruned")
}
};
let pushdown_rows_matched =
metric_value_or_zero(&metrics, "pushdown_rows_matched");
println!(" pushdown_rows_pruned: {pushdown_rows_pruned}");
let pushdown_rows_matched = get_value(&metrics, "pushdown_rows_matched");
println!(" pushdown_rows_matched: {pushdown_rows_matched}");

match pushdown_expected {
Expand All @@ -562,11 +570,6 @@ impl<'a> TestCase<'a> {
}
};

let (page_index_rows_pruned, page_index_rows_matched) =
get_pruning_metrics(&metrics, "page_index_rows_pruned");
println!(" page_index_rows_pruned: {page_index_rows_pruned}");
println!(" page_index_rows_matched: {page_index_rows_matched}");

let page_index_filtering_expected = if scan_options.enable_page_index {
self.page_index_filtering_expected
} else {
Expand All @@ -577,9 +580,17 @@ impl<'a> TestCase<'a> {

match page_index_filtering_expected {
PageIndexFilteringExpected::None => {
let (page_index_rows_pruned, page_index_rows_matched) =
pruning_metrics_or_zero(&metrics, "page_index_rows_pruned");
println!(" page_index_rows_pruned: {page_index_rows_pruned}");
println!(" page_index_rows_matched: {page_index_rows_matched}");
assert_eq!(page_index_rows_pruned, 0);
}
PageIndexFilteringExpected::Some => {
let (page_index_rows_pruned, page_index_rows_matched) =
expect_pruning_metrics(&metrics, "page_index_rows_pruned");
println!(" page_index_rows_pruned: {page_index_rows_pruned}");
println!(" page_index_rows_matched: {page_index_rows_matched}");
assert!(
page_index_rows_pruned > 0,
"Expected to filter rows via page index but none were",
Expand All @@ -591,29 +602,55 @@ impl<'a> TestCase<'a> {
}
}

fn get_pruning_metrics(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
fn pruning_metrics_or_zero(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
get_pruning_metrics(metrics, metric_name, true)
}

fn expect_pruning_metrics(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
get_pruning_metrics(metrics, metric_name, false)
}

fn get_pruning_metrics(
metrics: &MetricsSet,
metric_name: &str,
allow_missing: bool,
) -> (usize, usize) {
match metrics.sum_by_name(metric_name) {
Some(MetricValue::PruningMetrics {
pruning_metrics, ..
}) => (pruning_metrics.pruned(), pruning_metrics.matched()),
Some(_) => {
panic!("Metric '{metric_name}' is not a pruning metric in\n\n{metrics:#?}")
}
None => panic!(
"Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
),
None if allow_missing => (0, 0),
None => panic!("Expected metric '{metric_name}' not found in\n\n{metrics:#?}"),
}
}

fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
fn metric_value_or_zero(metrics: &MetricsSet, metric_name: &str) -> usize {
get_value(metrics, metric_name, true)
}

fn expect_metric_value(metrics: &MetricsSet, metric_name: &str) -> usize {
get_value(metrics, metric_name, false)
}

fn metric_value_for_expected(
metrics: &MetricsSet,
metric_name: &str,
expected: usize,
) -> usize {
get_value(metrics, metric_name, expected == 0)
}

fn get_value(metrics: &MetricsSet, metric_name: &str, allow_missing: bool) -> usize {
match metrics.sum_by_name(metric_name) {
Some(MetricValue::PruningMetrics {
pruning_metrics, ..
}) => pruning_metrics.pruned(),
Some(v) => v.as_usize(),
None => panic!(
"Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
),
None if allow_missing => 0,
None => panic!("Expected metric '{metric_name}' not found in\n\n{metrics:#?}"),
}
}

Expand Down Expand Up @@ -736,11 +773,19 @@ impl PredicateCacheTest {

// verify the predicate cache metrics
assert_eq!(
get_value(&metrics, "predicate_cache_inner_records"),
metric_value_for_expected(
&metrics,
"predicate_cache_inner_records",
expected_inner_records
),
expected_inner_records
);
assert_eq!(
get_value(&metrics, "predicate_cache_records"),
metric_value_for_expected(
&metrics,
"predicate_cache_records",
expected_records
),
expected_records
);
Ok(())
Expand Down
14 changes: 14 additions & 0 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,20 @@ impl TestOutput {
}
}

pub(crate) fn zero_if_metric_absent(
actual: Option<usize>,
expected: Option<usize>,
) -> Option<usize> {
// Lazy parquet metrics may represent zero as an absent metric. Tests that
// specifically enforce lazy-skipping zero metrics live with the metrics
// implementation; these integration tests only compare scan behavior.
if expected == Some(0) {
Some(actual.unwrap_or(0))
} else {
actual
}
}

/// Creates an execution context that has an external table "t"
/// registered pointing at a parquet file made with `make_test_file`
/// and the appropriate scenario
Expand Down
17 changes: 13 additions & 4 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::sync::Arc;

use crate::parquet::Unit::Page;
use crate::parquet::{ContextWithParquet, Scenario};
use crate::parquet::{ContextWithParquet, Scenario, zero_if_metric_absent};

use arrow::array::{Int32Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};
Expand Down Expand Up @@ -238,8 +238,14 @@ async fn test_prune(
.await;

println!("{}", output.description());
assert_eq!(output.predicate_evaluation_errors(), expected_errors);
assert_eq!(output.row_pages_pruned(), expected_row_pages_pruned);
assert_eq!(
zero_if_metric_absent(output.predicate_evaluation_errors(), expected_errors),
expected_errors
);
assert_eq!(
zero_if_metric_absent(output.row_pages_pruned(), expected_row_pages_pruned),
expected_row_pages_pruned
);
assert_eq!(
output.result_rows,
expected_results,
Expand Down Expand Up @@ -360,7 +366,10 @@ async fn prune_date64() {

println!("{}", output.description());
// This should prune out groups without error
assert_eq!(output.predicate_evaluation_errors(), Some(0));
assert_eq!(
zero_if_metric_absent(output.predicate_evaluation_errors(), Some(0)),
Some(0)
);
assert_eq!(output.row_pages_pruned(), Some(15));
assert_eq!(output.result_rows, 1, "{}", output.description());
}
Expand Down
Loading
Loading