Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 14 additions & 19 deletions datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,20 +153,17 @@ impl OptimizerRule for ScalarSubqueryToJoin {
}

let mut all_subqueries = vec![];
#[allow(clippy::allow_attributes, clippy::mutable_key_type)]
// Expr contains Arc with interior mutability but is intentionally used as hash key
let mut expr_to_rewrite_expr_map = HashMap::new();
#[allow(clippy::allow_attributes, clippy::mutable_key_type)]
// Expr contains Arc with interior mutability but is intentionally used as hash key
let mut subquery_to_expr_map = HashMap::new();
for expr in projection.expr.iter() {
let (subqueries, rewrite_exprs) =
let mut alias_to_index: HashMap<String, usize> = HashMap::new();
let mut rewrite_exprs: Vec<Expr> =
Vec::with_capacity(projection.expr.len());
for (idx, expr) in projection.expr.iter().enumerate() {
let (subqueries, rewrite_expr) =
self.extract_subquery_exprs(expr, config.alias_generator())?;
for (subquery, _) in &subqueries {
subquery_to_expr_map.insert(subquery.clone(), expr.clone());
for (_, alias) in &subqueries {
alias_to_index.insert(alias.clone(), idx);
}
all_subqueries.extend(subqueries);
expr_to_rewrite_expr_map.insert(expr, rewrite_exprs);
rewrite_exprs.push(rewrite_expr);
}
assert_or_internal_err!(
!all_subqueries.is_empty(),
Expand All @@ -180,10 +177,9 @@ impl OptimizerRule for ScalarSubqueryToJoin {
{
cur_input = optimized_subquery;
if !expr_check_map.is_empty()
&& let Some(expr) = subquery_to_expr_map.get(&subquery)
&& let Some(rewrite_expr) = expr_to_rewrite_expr_map.get(expr)
&& let Some(&idx) = alias_to_index.get(&alias)
{
let new_expr = rewrite_expr
let new_expr = rewrite_exprs[idx]
.clone()
.transform_up(|expr| {
// replace column references with entry in map, if it exists
Expand All @@ -197,7 +193,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
}
})
.data()?;
expr_to_rewrite_expr_map.insert(expr, new_expr);
rewrite_exprs[idx] = new_expr;
}
} else {
// if we can't handle all of the subqueries then bail for now
Expand All @@ -206,14 +202,13 @@ impl OptimizerRule for ScalarSubqueryToJoin {
}

let mut proj_exprs = vec![];
for expr in projection.expr.iter() {
for (expr, new_expr) in projection.expr.iter().zip(rewrite_exprs) {
let old_expr_name = expr.schema_name().to_string();
let new_expr = expr_to_rewrite_expr_map.get(expr).unwrap();
let new_expr_name = new_expr.schema_name().to_string();
if new_expr_name != old_expr_name {
proj_exprs.push(new_expr.clone().alias(old_expr_name))
proj_exprs.push(new_expr.alias(old_expr_name))
} else {
proj_exprs.push(new_expr.clone());
proj_exprs.push(new_expr);
}
}
let new_plan = LogicalPlanBuilder::from(cur_input)
Expand Down
16 changes: 16 additions & 0 deletions datafusion/sqllogictest/test_files/subquery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,22 @@ SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) from t1
44 0


#correlated_scalar_subquery_count_agg_duplicated_in_projection
# Two structurally identical correlated COUNT subqueries in the same
# projection must each receive the count-bug compensation, so unmatched
# outer rows produce 0 (not NULL) on both sides.
query III rowsort
SELECT
t1_id,
(SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) + 1 AS a,
(SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) + 2 AS b
FROM t1
----
11 2 3
22 1 2
33 4 5
44 1 2

#correlated_scalar_subquery_count_agg2
query TT
explain SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1
Expand Down
Loading