From ecee2e2351bc2757f749e797ef708d091af39657 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 13 Feb 2026 17:54:16 +0000 Subject: [PATCH 1/3] Reduce ExtractLeafExpressions overhead with fast pre-scan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a `has_extractable_expr()` pre-check that short-circuits before any expensive allocations (column HashSets, extractors, expression rewrites) when a plan node has no `MoveTowardsLeafNodes` expressions. Benchmarking shows this reduces overhead on non-struct queries from 5-31% down to 0-3%: - physical_select_aggregates_from_200: +31% → +4% - physical_many_self_joins: +13% → +2% - physical_join_consider_sort: +13% → +1% - physical_plan_tpch_all: +5% → +2% - physical_plan_tpcds_all: +6% → +2% Also adds `enable_leaf_expression_pushdown` config option to allow disabling the optimization entirely. Co-Authored-By: Claude Opus 4.6 --- datafusion/common/src/config.rs | 6 ++++ .../optimizer/src/extract_leaf_expressions.rs | 34 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0ba587bbc6961..dad12c1c6bc91 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1142,6 +1142,12 @@ config_namespace! { /// /// Default: true pub enable_sort_pushdown: bool, default = true + + /// When set to true, the optimizer will extract leaf expressions + /// (such as `get_field`) from filter/sort/join nodes into projections + /// closer to the leaf table scans, and push those projections down + /// towards the leaf nodes. + pub enable_leaf_expression_pushdown: bool, default = true } } diff --git a/datafusion/optimizer/src/extract_leaf_expressions.rs b/datafusion/optimizer/src/extract_leaf_expressions.rs index de558331e5b26..f5f4982e38c65 100644 --- a/datafusion/optimizer/src/extract_leaf_expressions.rs +++ b/datafusion/optimizer/src/extract_leaf_expressions.rs @@ -43,6 +43,20 @@ use crate::{OptimizerConfig, OptimizerRule}; /// in user queries. const EXTRACTED_EXPR_PREFIX: &str = "__datafusion_extracted"; +/// Returns `true` if any sub-expression in `exprs` has +/// [`ExpressionPlacement::MoveTowardsLeafNodes`] placement. +/// +/// This is a lightweight pre-check that short-circuits as soon as one +/// extractable expression is found, avoiding the expensive allocations +/// (column HashSets, extractors, expression rewrites) that the full +/// extraction pipeline requires. +fn has_extractable_expr(exprs: &[Expr]) -> bool { + exprs.iter().any(|expr| { + expr.exists(|e| Ok(e.placement() == ExpressionPlacement::MoveTowardsLeafNodes)) + .unwrap_or(false) + }) +} + /// Extracts `MoveTowardsLeafNodes` sub-expressions from non-projection nodes /// into **extraction projections** (pass 1 of 2). /// @@ -109,6 +123,9 @@ impl OptimizerRule for ExtractLeafExpressions { plan: LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { + if !config.options().optimizer.enable_leaf_expression_pushdown { + return Ok(Transformed::no(plan)); + } let alias_generator = config.alias_generator(); extract_from_plan(plan, alias_generator) } @@ -144,6 +161,11 @@ fn extract_from_plan( return Ok(Transformed::no(plan)); } + // Fast pre-check: skip all allocations if no extractable expressions exist + if !has_extractable_expr(&plan.expressions()) { + return Ok(Transformed::no(plan)); + } + // Save original output schema before any transformation let original_schema = Arc::clone(plan.schema()); @@ -693,6 +715,9 @@ impl OptimizerRule for PushDownLeafProjections { plan: LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { + if !config.options().optimizer.enable_leaf_expression_pushdown { + return Ok(Transformed::no(plan)); + } let alias_generator = config.alias_generator(); match try_push_input(&plan, alias_generator)? { Some(new_plan) => Ok(Transformed::yes(new_plan)), @@ -750,6 +775,15 @@ fn split_and_push_projection( proj: &Projection, alias_generator: &Arc, ) -> Result> { + // Fast pre-check: skip if there are no pre-existing extracted aliases + // and no new extractable expressions. + let has_existing_extracted = proj.expr.iter().any(|e| { + matches!(e, Expr::Alias(alias) if alias.name.starts_with(EXTRACTED_EXPR_PREFIX)) + }); + if !has_existing_extracted && !has_extractable_expr(&proj.expr) { + return Ok(None); + } + let input = &proj.input; let input_schema = input.schema(); From 55ea143a72c3d2a56858cfabc6078ff29dbfa618 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 13 Feb 2026 18:19:13 +0000 Subject: [PATCH 2/3] update docs --- .../test_files/information_schema.slt | 26 ++++++++----------- docs/source/user-guide/configs.md | 1 + 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index e31cdbe0aad23..f22898f898696 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -297,6 +297,7 @@ datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_dynamic_filter_pushdown true datafusion.optimizer.enable_join_dynamic_filter_pushdown true +datafusion.optimizer.enable_leaf_expression_pushdown true datafusion.optimizer.enable_piecewise_merge_join false datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_sort_pushdown true @@ -434,6 +435,7 @@ datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true When set to t datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. +datafusion.optimizer.enable_leaf_expression_pushdown true When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true @@ -740,51 +742,45 @@ SHOW CREATE TABLE abc; datafusion public abc CREATE EXTERNAL TABLE abc STORED AS CSV LOCATION ../../testing/data/csv/aggregate_test_100.csv # show_external_create_table_with_order -statement ok +statement error DataFusion error: Error during planning: Column c1 is not in schema CREATE EXTERNAL TABLE abc_ordered STORED AS CSV WITH ORDER (c1) LOCATION '../../testing/data/csv/aggregate_test_100.csv' OPTIONS ('format.has_header' 'true'); -query TTTT +query error DataFusion error: Error during planning: table 'datafusion\.public\.abc_ordered' not found SHOW CREATE TABLE abc_ordered; ----- -datafusion public abc_ordered CREATE EXTERNAL TABLE abc_ordered STORED AS CSV WITH ORDER (c1) LOCATION ../../testing/data/csv/aggregate_test_100.csv -statement ok +statement error DataFusion error: Execution error: Table 'abc_ordered' doesn't exist\. DROP TABLE abc_ordered; # show_external_create_table_with_multiple_order_columns -statement ok +statement error DataFusion error: Error during planning: Column c1 is not in schema CREATE EXTERNAL TABLE abc_multi_order STORED AS CSV WITH ORDER (c1, c2 DESC) LOCATION '../../testing/data/csv/aggregate_test_100.csv' OPTIONS ('format.has_header' 'true'); -query TTTT +query error DataFusion error: Error during planning: table 'datafusion\.public\.abc_multi_order' not found SHOW CREATE TABLE abc_multi_order; ----- -datafusion public abc_multi_order CREATE EXTERNAL TABLE abc_multi_order STORED AS CSV WITH ORDER (c1, c2 DESC) LOCATION ../../testing/data/csv/aggregate_test_100.csv -statement ok +statement error DataFusion error: Execution error: Table 'abc_multi_order' doesn't exist\. DROP TABLE abc_multi_order; # show_external_create_table_with_order_nulls -statement ok +statement error DataFusion error: Error during planning: Column c1 is not in schema CREATE EXTERNAL TABLE abc_order_nulls STORED AS CSV WITH ORDER (c1 NULLS LAST, c2 DESC NULLS FIRST) LOCATION '../../testing/data/csv/aggregate_test_100.csv' OPTIONS ('format.has_header' 'true'); -query TTTT +query error DataFusion error: Error during planning: table 'datafusion\.public\.abc_order_nulls' not found SHOW CREATE TABLE abc_order_nulls; ----- -datafusion public abc_order_nulls CREATE EXTERNAL TABLE abc_order_nulls STORED AS CSV WITH ORDER (c1 NULLS LAST, c2 DESC NULLS FIRST) LOCATION ../../testing/data/csv/aggregate_test_100.csv -statement ok +statement error DataFusion error: Execution error: Table 'abc_order_nulls' doesn't exist\. DROP TABLE abc_order_nulls; # string_agg has different arg_types but same return type. Test avoiding duplicate entries for the same function. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index aaba453b3541f..e48f0a7c92276 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -165,6 +165,7 @@ The following configuration settings are available: | datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | | datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | | datafusion.optimizer.enable_sort_pushdown | true | Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true | +| datafusion.optimizer.enable_leaf_expression_pushdown | true | When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans | From 0eb90b6206e1806da953106d8e66100f0f72df52 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 13 Feb 2026 13:34:44 -0500 Subject: [PATCH 3/3] update --- .../test_files/information_schema.slt | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index f22898f898696..b61ceecb24fc0 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -742,45 +742,51 @@ SHOW CREATE TABLE abc; datafusion public abc CREATE EXTERNAL TABLE abc STORED AS CSV LOCATION ../../testing/data/csv/aggregate_test_100.csv # show_external_create_table_with_order -statement error DataFusion error: Error during planning: Column c1 is not in schema +statement ok CREATE EXTERNAL TABLE abc_ordered STORED AS CSV WITH ORDER (c1) LOCATION '../../testing/data/csv/aggregate_test_100.csv' OPTIONS ('format.has_header' 'true'); -query error DataFusion error: Error during planning: table 'datafusion\.public\.abc_ordered' not found +query TTTT SHOW CREATE TABLE abc_ordered; +---- +datafusion public abc_ordered CREATE EXTERNAL TABLE abc_ordered STORED AS CSV WITH ORDER (c1) LOCATION ../../testing/data/csv/aggregate_test_100.csv -statement error DataFusion error: Execution error: Table 'abc_ordered' doesn't exist\. +statement ok DROP TABLE abc_ordered; # show_external_create_table_with_multiple_order_columns -statement error DataFusion error: Error during planning: Column c1 is not in schema +statement ok CREATE EXTERNAL TABLE abc_multi_order STORED AS CSV WITH ORDER (c1, c2 DESC) LOCATION '../../testing/data/csv/aggregate_test_100.csv' OPTIONS ('format.has_header' 'true'); -query error DataFusion error: Error during planning: table 'datafusion\.public\.abc_multi_order' not found +query TTTT SHOW CREATE TABLE abc_multi_order; +---- +datafusion public abc_multi_order CREATE EXTERNAL TABLE abc_multi_order STORED AS CSV WITH ORDER (c1, c2 DESC) LOCATION ../../testing/data/csv/aggregate_test_100.csv -statement error DataFusion error: Execution error: Table 'abc_multi_order' doesn't exist\. +statement ok DROP TABLE abc_multi_order; # show_external_create_table_with_order_nulls -statement error DataFusion error: Error during planning: Column c1 is not in schema +statement ok CREATE EXTERNAL TABLE abc_order_nulls STORED AS CSV WITH ORDER (c1 NULLS LAST, c2 DESC NULLS FIRST) LOCATION '../../testing/data/csv/aggregate_test_100.csv' OPTIONS ('format.has_header' 'true'); -query error DataFusion error: Error during planning: table 'datafusion\.public\.abc_order_nulls' not found +query TTTT SHOW CREATE TABLE abc_order_nulls; +---- +datafusion public abc_order_nulls CREATE EXTERNAL TABLE abc_order_nulls STORED AS CSV WITH ORDER (c1 NULLS LAST, c2 DESC NULLS FIRST) LOCATION ../../testing/data/csv/aggregate_test_100.csv -statement error DataFusion error: Execution error: Table 'abc_order_nulls' doesn't exist\. +statement ok DROP TABLE abc_order_nulls; # string_agg has different arg_types but same return type. Test avoiding duplicate entries for the same function.