Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,12 @@ config_namespace! {
///
/// Default: true
pub enable_sort_pushdown: bool, default = true

/// When set to true, the optimizer will extract leaf expressions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

/// (such as `get_field`) from filter/sort/join nodes into projections
/// closer to the leaf table scans, and push those projections down
/// towards the leaf nodes.
pub enable_leaf_expression_pushdown: bool, default = true
}
}

Expand Down
34 changes: 34 additions & 0 deletions datafusion/optimizer/src/extract_leaf_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ use crate::{OptimizerConfig, OptimizerRule};
/// in user queries.
const EXTRACTED_EXPR_PREFIX: &str = "__datafusion_extracted";

/// Returns `true` if any sub-expression in `exprs` has
/// [`ExpressionPlacement::MoveTowardsLeafNodes`] placement.
///
/// This is a lightweight pre-check that short-circuits as soon as one
/// extractable expression is found, avoiding the expensive allocations
/// (column HashSets, extractors, expression rewrites) that the full
/// extraction pipeline requires.
fn has_extractable_expr(exprs: &[Expr]) -> bool {
exprs.iter().any(|expr| {
expr.exists(|e| Ok(e.placement() == ExpressionPlacement::MoveTowardsLeafNodes))
.unwrap_or(false)
})
}

/// Extracts `MoveTowardsLeafNodes` sub-expressions from non-projection nodes
/// into **extraction projections** (pass 1 of 2).
///
Expand Down Expand Up @@ -109,6 +123,9 @@ impl OptimizerRule for ExtractLeafExpressions {
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
if !config.options().optimizer.enable_leaf_expression_pushdown {
return Ok(Transformed::no(plan));
}
let alias_generator = config.alias_generator();
extract_from_plan(plan, alias_generator)
}
Expand Down Expand Up @@ -144,6 +161,11 @@ fn extract_from_plan(
return Ok(Transformed::no(plan));
}

// Fast pre-check: skip all allocations if no extractable expressions exist
if !has_extractable_expr(&plan.expressions()) {
return Ok(Transformed::no(plan));
}

// Save original output schema before any transformation
let original_schema = Arc::clone(plan.schema());

Expand Down Expand Up @@ -693,6 +715,9 @@ impl OptimizerRule for PushDownLeafProjections {
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
if !config.options().optimizer.enable_leaf_expression_pushdown {
return Ok(Transformed::no(plan));
}
let alias_generator = config.alias_generator();
match try_push_input(&plan, alias_generator)? {
Some(new_plan) => Ok(Transformed::yes(new_plan)),
Expand Down Expand Up @@ -750,6 +775,15 @@ fn split_and_push_projection(
proj: &Projection,
alias_generator: &Arc<AliasGenerator>,
) -> Result<Option<LogicalPlan>> {
// Fast pre-check: skip if there are no pre-existing extracted aliases
// and no new extractable expressions.
let has_existing_extracted = proj.expr.iter().any(|e| {
matches!(e, Expr::Alias(alias) if alias.name.starts_with(EXTRACTED_EXPR_PREFIX))
});
if !has_existing_extracted && !has_extractable_expr(&proj.expr) {
return Ok(None);
}

let input = &proj.input;
let input_schema = input.schema();

Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
datafusion.optimizer.enable_dynamic_filter_pushdown true
datafusion.optimizer.enable_join_dynamic_filter_pushdown true
datafusion.optimizer.enable_leaf_expression_pushdown true
datafusion.optimizer.enable_piecewise_merge_join false
datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.enable_sort_pushdown true
Expand Down Expand Up @@ -434,6 +435,7 @@ datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true When set to t
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase.
datafusion.optimizer.enable_leaf_expression_pushdown true When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes.
datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter.
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ The following configuration settings are available:
| datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave |
| datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. |
| datafusion.optimizer.enable_sort_pushdown | true | Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true |
| datafusion.optimizer.enable_leaf_expression_pushdown | true | When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. |
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |
Expand Down