diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0ba587bbc6961..dad12c1c6bc91 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1142,6 +1142,12 @@ config_namespace! { /// /// Default: true pub enable_sort_pushdown: bool, default = true + + /// When set to true, the optimizer will extract leaf expressions + /// (such as `get_field`) from filter/sort/join nodes into projections + /// closer to the leaf table scans, and push those projections down + /// towards the leaf nodes. + pub enable_leaf_expression_pushdown: bool, default = true } } diff --git a/datafusion/optimizer/src/extract_leaf_expressions.rs b/datafusion/optimizer/src/extract_leaf_expressions.rs index de558331e5b26..f5f4982e38c65 100644 --- a/datafusion/optimizer/src/extract_leaf_expressions.rs +++ b/datafusion/optimizer/src/extract_leaf_expressions.rs @@ -43,6 +43,20 @@ use crate::{OptimizerConfig, OptimizerRule}; /// in user queries. const EXTRACTED_EXPR_PREFIX: &str = "__datafusion_extracted"; +/// Returns `true` if any sub-expression in `exprs` has +/// [`ExpressionPlacement::MoveTowardsLeafNodes`] placement. +/// +/// This is a lightweight pre-check that short-circuits as soon as one +/// extractable expression is found, avoiding the expensive allocations +/// (column HashSets, extractors, expression rewrites) that the full +/// extraction pipeline requires. +fn has_extractable_expr(exprs: &[Expr]) -> bool { + exprs.iter().any(|expr| { + expr.exists(|e| Ok(e.placement() == ExpressionPlacement::MoveTowardsLeafNodes)) + .unwrap_or(false) + }) +} + /// Extracts `MoveTowardsLeafNodes` sub-expressions from non-projection nodes /// into **extraction projections** (pass 1 of 2). /// @@ -109,6 +123,9 @@ impl OptimizerRule for ExtractLeafExpressions { plan: LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { + if !config.options().optimizer.enable_leaf_expression_pushdown { + return Ok(Transformed::no(plan)); + } let alias_generator = config.alias_generator(); extract_from_plan(plan, alias_generator) } @@ -144,6 +161,11 @@ fn extract_from_plan( return Ok(Transformed::no(plan)); } + // Fast pre-check: skip all allocations if no extractable expressions exist + if !has_extractable_expr(&plan.expressions()) { + return Ok(Transformed::no(plan)); + } + // Save original output schema before any transformation let original_schema = Arc::clone(plan.schema()); @@ -693,6 +715,9 @@ impl OptimizerRule for PushDownLeafProjections { plan: LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { + if !config.options().optimizer.enable_leaf_expression_pushdown { + return Ok(Transformed::no(plan)); + } let alias_generator = config.alias_generator(); match try_push_input(&plan, alias_generator)? { Some(new_plan) => Ok(Transformed::yes(new_plan)), @@ -750,6 +775,15 @@ fn split_and_push_projection( proj: &Projection, alias_generator: &Arc, ) -> Result> { + // Fast pre-check: skip if there are no pre-existing extracted aliases + // and no new extractable expressions. + let has_existing_extracted = proj.expr.iter().any(|e| { + matches!(e, Expr::Alias(alias) if alias.name.starts_with(EXTRACTED_EXPR_PREFIX)) + }); + if !has_existing_extracted && !has_extractable_expr(&proj.expr) { + return Ok(None); + } + let input = &proj.input; let input_schema = input.schema(); diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index e31cdbe0aad23..b61ceecb24fc0 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -297,6 +297,7 @@ datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_dynamic_filter_pushdown true datafusion.optimizer.enable_join_dynamic_filter_pushdown true +datafusion.optimizer.enable_leaf_expression_pushdown true datafusion.optimizer.enable_piecewise_merge_join false datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_sort_pushdown true @@ -434,6 +435,7 @@ datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true When set to t datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. +datafusion.optimizer.enable_leaf_expression_pushdown true When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index aaba453b3541f..e48f0a7c92276 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -165,6 +165,7 @@ The following configuration settings are available: | datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | | datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | | datafusion.optimizer.enable_sort_pushdown | true | Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true | +| datafusion.optimizer.enable_leaf_expression_pushdown | true | When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |