Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use datafusion_expr::{
};

use crate::optimizer::ApplyOrder;
use crate::simplify_expressions::simplify_predicates;
use crate::simplify_expressions::{reorder_predicates, simplify_predicates};
use crate::utils::{
ColumnReference, has_all_column_refs, is_restrict_null_predicate, schema_columns,
};
Expand Down Expand Up @@ -778,24 +778,31 @@ impl OptimizerRule for PushDownFilter {
return push_down_join(join, None);
};

let plan_schema = Arc::clone(plan.schema());

let LogicalPlan::Filter(mut filter) = plan else {
return Ok(Transformed::no(plan));
};
let plan_schema = Arc::clone(filter.input.schema());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to the core change but should save a few cycles.


let predicate = split_conjunction_owned(filter.predicate.clone());
let old_predicate_len = predicate.len();
let new_predicates =
with_debug_timing("simplify_predicates", || simplify_predicates(predicate))?;

if log_enabled!(Level::Debug) {
debug!(
"push_down_filter: simplify_predicates old_count={}, new_count={}",
old_predicate_len,
new_predicates.len()
);
}
if old_predicate_len != new_predicates.len() {

// Place cheap predicates before expensive ones, so the `AND`
// evaluator's right-side short-circuit can skip evaluating expensive
// predicates on rows that have already been filtered out.
let (new_predicates, reorder_changed) = reorder_predicates(new_predicates);

let count_changed = old_predicate_len != new_predicates.len();
if count_changed || reorder_changed {
let Some(new_predicate) = conjunction(new_predicates) else {
// new_predicates is empty - remove the filter entirely
// Return the child plan without the filter
Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/simplify_expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod expr_simplifier;
mod inlist_simplifier;
mod linear_aggregates;
mod regex;
mod reorder_predicates;
pub mod simplify_exprs;
pub mod simplify_literal;
mod simplify_predicates;
Expand All @@ -33,6 +34,7 @@ mod utils;
pub use datafusion_expr::simplify::SimplifyContext;

pub use expr_simplifier::*;
pub(crate) use reorder_predicates::reorder_predicates;
pub use simplify_exprs::*;
pub use simplify_predicates::simplify_predicates;

Expand Down
190 changes: 190 additions & 0 deletions datafusion/optimizer/src/simplify_expressions/reorder_predicates.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Reorder conjunctive (`AND`) predicates so that cheap predicates run before
//! expensive ones.
//!
//! DataFusion's `AND` evaluator short-circuits the right-hand side when the
//! left-hand side keeps few rows, so leading with a cheap predicate shrinks
//! the batch that expensive ones see.
//!
//! The cost of evaluating a predicate is assessed with a simple, conservative
//! heuristic: we define an allow-list of cheap operations, and consider an
//! expression to be cheap if it consists ONLY of cheap operations; everything
//! else is considered expensive.
//!
//! The sort is stable, so order within each class is preserved.

use datafusion_common::tree_node::TreeNode;
use datafusion_expr::{BinaryExpr, Expr, Operator};

/// Stable partition of `predicates`: cheap first, then expensive.
///
/// Returns `(predicates, changed)`. When `changed` is `false` the input was
/// already cheap-first and the caller can skip rebuilding the conjunction.
pub(crate) fn reorder_predicates(predicates: Vec<Expr>) -> (Vec<Expr>, bool) {
if predicates.len() <= 1 {
return (predicates, false);
}

// Volatile predicates may have observable side-effects and reordering
// conjuncts can change how many times they evaluate. Preserve user order
// if any predicate contains a volatile expression.
if predicates.iter().any(Expr::is_volatile) {
return (predicates, false);
}

let classes: Vec<bool> = predicates.iter().map(is_cheap_predicate).collect();

// A reorder is needed iff an expensive predicate precedes a cheap one
let needs_reorder = classes.windows(2).any(|w| !w[0] && w[1]);
if !needs_reorder {
return (predicates, false);
}

let mut cheap = Vec::with_capacity(predicates.len());
let mut expensive = Vec::new();
for (p, is_cheap) in predicates.into_iter().zip(classes) {
if is_cheap {
cheap.push(p);
} else {
expensive.push(p);
}
}
cheap.extend(expensive);
(cheap, true)
}

/// Returns true if every node in `expr`'s tree is cheap.
fn is_cheap_predicate(expr: &Expr) -> bool {
!expr
.exists(|node| Ok(!is_cheap_node(node)))
.expect("is_cheap_node is infallible")
}

/// Returns true if `expr` is itself cheap.
///
/// We use a simple, conservative heuristic to determine if an expression is
/// cheap to evaluate: we enumerate known-cheap operations (e.g., equality
/// comparisons, negations, casts), and consider anything outside this list to
/// be expensive. New/unrecognized expressions therefore default to being
/// expensive.
fn is_cheap_node(expr: &Expr) -> bool {
match expr {
// Direct reads and literals.
Expr::Column(_)
| Expr::Literal(_, _)
| Expr::ScalarVariable(_, _)
| Expr::Placeholder(_)
| Expr::OuterReferenceColumn(_, _)
| Expr::LambdaVariable(_)
// Wrappers; children are walked separately by `is_cheap_predicate`.
| Expr::Alias(_)
// Single-row unary predicates and arithmetic negation.
| Expr::Not(_)
| Expr::Negative(_)
| Expr::IsNull(_)
| Expr::IsNotNull(_)
| Expr::IsTrue(_)
| Expr::IsFalse(_)
| Expr::IsUnknown(_)
| Expr::IsNotTrue(_)
| Expr::IsNotFalse(_)
| Expr::IsNotUnknown(_)
// Composite cheap forms; child expressions are walked separately.
| Expr::Between(_)
| Expr::Case(_)
| Expr::Cast(_)
| Expr::TryCast(_)
| Expr::InList(_) => true,
// BinaryExpr is cheap unless the operator is LIKE or regexp matching.
Expr::BinaryExpr(BinaryExpr { op, .. }) => !matches!(
op,
Operator::LikeMatch
| Operator::ILikeMatch
| Operator::NotLikeMatch
| Operator::NotILikeMatch
| Operator::RegexMatch
| Operator::RegexIMatch
| Operator::RegexNotMatch
| Operator::RegexNotIMatch
),
_ => false,
}
}

#[cfg(test)]
mod tests {
use super::*;
use datafusion_expr::{col, lit};

#[test]
fn like_predicate_moves_after_equality() {
let cheap = col("a").eq(lit(1));
let expensive = col("b").like(lit("%foo%"));
let (out, changed) = reorder_predicates(vec![expensive.clone(), cheap.clone()]);
assert_eq!(out, vec![cheap, expensive]);
assert!(changed);
}

#[test]
fn order_among_cheap_predicates_is_preserved() {
let p1 = col("a").eq(lit(1));
let p2 = col("b").eq(lit(2));
let p3 = col("c").eq(lit(3));
let input = vec![p1.clone(), p2.clone(), p3.clone()];
let (out, changed) = reorder_predicates(input.clone());
assert_eq!(out, input);
assert!(!changed);
}

#[test]
fn order_among_expensive_predicates_is_preserved() {
let p1 = col("a").like(lit("%a%"));
let p2 = Expr::BinaryExpr(BinaryExpr::new(
Box::new(col("b")),
Operator::RegexMatch,
Box::new(lit("foo")),
));
let p3 = col("c").like(lit("%c%"));
let input = vec![p1.clone(), p2.clone(), p3.clone()];
let (out, changed) = reorder_predicates(input.clone());
assert_eq!(out, input);
assert!(!changed);
}

#[test]
fn already_cheap_first_reports_no_change() {
let cheap = col("a").eq(lit(1));
let expensive = col("b").like(lit("%a%"));
let input = vec![cheap.clone(), expensive.clone()];
let (out, changed) = reorder_predicates(input.clone());
assert_eq!(out, input);
assert!(!changed);
}

#[test]
fn nested_expensive_under_not_is_expensive() {
// The top node is `Not`, which is on the cheap allow-list. The walk
// must descend into the `Like` to flag this predicate as expensive.
let cheap = col("a").eq(lit(1));
let nested = Expr::Not(Box::new(col("b").like(lit("%foo%"))));
let (out, changed) = reorder_predicates(vec![nested.clone(), cheap.clone()]);
assert_eq!(out, vec![cheap, nested]);
assert!(changed);
}
}
16 changes: 8 additions & 8 deletions datafusion/sqllogictest/test_files/clickbench.slt
Original file line number Diff line number Diff line change
Expand Up @@ -593,18 +593,18 @@ logical_plan
02)--Projection: hits.SearchPhrase, min(hits.URL), count(Int64(1)) AS count(*) AS c
03)----Aggregate: groupBy=[[hits.SearchPhrase]], aggr=[[min(hits.URL), count(Int64(1))]]
04)------SubqueryAlias: hits
05)--------Filter: hits_raw.URL LIKE Utf8View("%google%") AND hits_raw.SearchPhrase != Utf8View("")
06)----------TableScan: hits_raw projection=[URL, SearchPhrase], partial_filters=[hits_raw.URL LIKE Utf8View("%google%"), hits_raw.SearchPhrase != Utf8View("")]
05)--------Filter: hits_raw.SearchPhrase != Utf8View("") AND hits_raw.URL LIKE Utf8View("%google%")
06)----------TableScan: hits_raw projection=[URL, SearchPhrase], partial_filters=[hits_raw.SearchPhrase != Utf8View(""), hits_raw.URL LIKE Utf8View("%google%")]
physical_plan
01)SortPreservingMergeExec: [c@2 DESC], fetch=10
02)--SortExec: TopK(fetch=10), expr=[c@2 DESC], preserve_partitioning=[true]
03)----ProjectionExec: expr=[SearchPhrase@0 as SearchPhrase, min(hits.URL)@1 as min(hits.URL), count(Int64(1))@2 as c]
04)------AggregateExec: mode=FinalPartitioned, gby=[SearchPhrase@0 as SearchPhrase], aggr=[min(hits.URL), count(Int64(1))]
05)--------RepartitionExec: partitioning=Hash([SearchPhrase@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[SearchPhrase@1 as SearchPhrase], aggr=[min(hits.URL), count(Int64(1))]
07)------------FilterExec: URL@0 LIKE %google% AND SearchPhrase@1 !=
07)------------FilterExec: SearchPhrase@1 != AND URL@0 LIKE %google%
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[URL, SearchPhrase], file_type=parquet, predicate=URL@13 LIKE %google% AND SearchPhrase@39 != , pruning_predicate=SearchPhrase_null_count@4 != row_count@5 AND (SearchPhrase_min@2 != OR != SearchPhrase_max@3), required_guarantees=[SearchPhrase not in ()]
09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[URL, SearchPhrase], file_type=parquet, predicate=SearchPhrase@39 != AND URL@13 LIKE %google%, pruning_predicate=SearchPhrase_null_count@2 != row_count@3 AND (SearchPhrase_min@0 != OR != SearchPhrase_max@1), required_guarantees=[SearchPhrase not in ()]

query TTI
SELECT "SearchPhrase", MIN("URL"), COUNT(*) AS c FROM hits WHERE "URL" LIKE '%google%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;
Expand All @@ -619,18 +619,18 @@ logical_plan
02)--Projection: hits.SearchPhrase, min(hits.URL), min(hits.Title), count(Int64(1)) AS count(*) AS c, count(DISTINCT hits.UserID)
03)----Aggregate: groupBy=[[hits.SearchPhrase]], aggr=[[min(hits.URL), min(hits.Title), count(Int64(1)), count(DISTINCT hits.UserID)]]
04)------SubqueryAlias: hits
05)--------Filter: hits_raw.Title LIKE Utf8View("%Google%") AND hits_raw.URL NOT LIKE Utf8View("%.google.%") AND hits_raw.SearchPhrase != Utf8View("")
06)----------TableScan: hits_raw projection=[Title, UserID, URL, SearchPhrase], partial_filters=[hits_raw.Title LIKE Utf8View("%Google%"), hits_raw.URL NOT LIKE Utf8View("%.google.%"), hits_raw.SearchPhrase != Utf8View("")]
05)--------Filter: hits_raw.SearchPhrase != Utf8View("") AND hits_raw.Title LIKE Utf8View("%Google%") AND hits_raw.URL NOT LIKE Utf8View("%.google.%")
06)----------TableScan: hits_raw projection=[Title, UserID, URL, SearchPhrase], partial_filters=[hits_raw.SearchPhrase != Utf8View(""), hits_raw.Title LIKE Utf8View("%Google%"), hits_raw.URL NOT LIKE Utf8View("%.google.%")]
physical_plan
01)SortPreservingMergeExec: [c@3 DESC], fetch=10
02)--SortExec: TopK(fetch=10), expr=[c@3 DESC], preserve_partitioning=[true]
03)----ProjectionExec: expr=[SearchPhrase@0 as SearchPhrase, min(hits.URL)@1 as min(hits.URL), min(hits.Title)@2 as min(hits.Title), count(Int64(1))@3 as c, count(DISTINCT hits.UserID)@4 as count(DISTINCT hits.UserID)]
04)------AggregateExec: mode=FinalPartitioned, gby=[SearchPhrase@0 as SearchPhrase], aggr=[min(hits.URL), min(hits.Title), count(Int64(1)), count(DISTINCT hits.UserID)]
05)--------RepartitionExec: partitioning=Hash([SearchPhrase@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[SearchPhrase@3 as SearchPhrase], aggr=[min(hits.URL), min(hits.Title), count(Int64(1)), count(DISTINCT hits.UserID)]
07)------------FilterExec: Title@0 LIKE %Google% AND URL@2 NOT LIKE %.google.% AND SearchPhrase@3 !=
07)------------FilterExec: SearchPhrase@3 != AND Title@0 LIKE %Google% AND URL@2 NOT LIKE %.google.%
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[Title, UserID, URL, SearchPhrase], file_type=parquet, predicate=Title@2 LIKE %Google% AND URL@13 NOT LIKE %.google.% AND SearchPhrase@39 != , pruning_predicate=SearchPhrase_null_count@6 != row_count@7 AND (SearchPhrase_min@4 != OR != SearchPhrase_max@5), required_guarantees=[SearchPhrase not in ()]
09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[Title, UserID, URL, SearchPhrase], file_type=parquet, predicate=SearchPhrase@39 != AND Title@2 LIKE %Google% AND URL@13 NOT LIKE %.google.%, pruning_predicate=SearchPhrase_null_count@2 != row_count@3 AND (SearchPhrase_min@0 != OR != SearchPhrase_max@1), required_guarantees=[SearchPhrase not in ()]

query TTTII
SELECT "SearchPhrase", MIN("URL"), MIN("Title"), COUNT(*) AS c, COUNT(DISTINCT "UserID") FROM hits WHERE "Title" LIKE '%Google%' AND "URL" NOT LIKE '%.google.%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/simplify_predicates.slt
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ WHERE int_col > 5
AND float_col BETWEEN 1 AND 100;
----
logical_plan
01)Filter: test_data.str_col LIKE Utf8View("A%") AND test_data.float_col >= Float32(1) AND test_data.float_col <= Float32(100) AND test_data.int_col > Int32(10)
01)Filter: test_data.float_col >= Float32(1) AND test_data.float_col <= Float32(100) AND test_data.int_col > Int32(10) AND test_data.str_col LIKE Utf8View("A%")
02)--TableScan: test_data projection=[int_col, float_col, str_col, date_col, bool_col]

statement ok
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ logical_plan
06)----------Projection: partsupp.ps_suppkey, part.p_brand, part.p_type, part.p_size
07)------------Inner Join: partsupp.ps_partkey = part.p_partkey
08)--------------TableScan: partsupp projection=[ps_partkey, ps_suppkey]
09)--------------Filter: part.p_brand != Utf8View("Brand#45") AND part.p_type NOT LIKE Utf8View("MEDIUM POLISHED%") AND part.p_size IN ([Int32(49), Int32(14), Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)])
10)----------------TableScan: part projection=[p_partkey, p_brand, p_type, p_size], partial_filters=[part.p_brand != Utf8View("Brand#45"), part.p_type NOT LIKE Utf8View("MEDIUM POLISHED%"), part.p_size IN ([Int32(49), Int32(14), Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)])]
09)--------------Filter: part.p_brand != Utf8View("Brand#45") AND part.p_size IN ([Int32(49), Int32(14), Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)]) AND part.p_type NOT LIKE Utf8View("MEDIUM POLISHED%")
10)----------------TableScan: part projection=[p_partkey, p_brand, p_type, p_size], partial_filters=[part.p_brand != Utf8View("Brand#45"), part.p_size IN ([Int32(49), Int32(14), Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)]), part.p_type NOT LIKE Utf8View("MEDIUM POLISHED%")]
11)----------SubqueryAlias: __correlated_sq_1
12)------------Projection: supplier.s_suppkey
13)--------------Filter: supplier.s_comment LIKE Utf8View("%Customer%Complaints%")
Expand All @@ -80,7 +80,7 @@ physical_plan
13)------------------------RepartitionExec: partitioning=Hash([ps_partkey@0], 4), input_partitions=4
14)--------------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:0..2932049], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:2932049..5864098], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:5864098..8796147], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/partsupp.tbl:8796147..11728193]]}, projection=[ps_partkey, ps_suppkey], file_type=csv, has_header=false
15)------------------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), input_partitions=4
16)--------------------------FilterExec: p_brand@1 != Brand#45 AND p_type@2 NOT LIKE MEDIUM POLISHED% AND p_size@3 IN (SET) ([49, 14, 23, 45, 19, 3, 36, 9])
16)--------------------------FilterExec: p_brand@1 != Brand#45 AND p_size@3 IN (SET) ([49, 14, 23, 45, 19, 3, 36, 9]) AND p_type@2 NOT LIKE MEDIUM POLISHED%
17)----------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
18)------------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]}, projection=[p_partkey, p_brand, p_type, p_size], file_type=csv, has_header=false
19)--------------------FilterExec: s_comment@1 LIKE %Customer%Complaints%, projection=[s_suppkey@0]
Expand Down
Loading
Loading