diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 941fcffb798f..76d22c7fb374 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -153,20 +153,17 @@ impl OptimizerRule for ScalarSubqueryToJoin { } let mut all_subqueries = vec![]; - #[allow(clippy::allow_attributes, clippy::mutable_key_type)] - // Expr contains Arc with interior mutability but is intentionally used as hash key - let mut expr_to_rewrite_expr_map = HashMap::new(); - #[allow(clippy::allow_attributes, clippy::mutable_key_type)] - // Expr contains Arc with interior mutability but is intentionally used as hash key - let mut subquery_to_expr_map = HashMap::new(); - for expr in projection.expr.iter() { - let (subqueries, rewrite_exprs) = + let mut alias_to_index: HashMap = HashMap::new(); + let mut rewrite_exprs: Vec = + Vec::with_capacity(projection.expr.len()); + for (idx, expr) in projection.expr.iter().enumerate() { + let (subqueries, rewrite_expr) = self.extract_subquery_exprs(expr, config.alias_generator())?; - for (subquery, _) in &subqueries { - subquery_to_expr_map.insert(subquery.clone(), expr.clone()); + for (_, alias) in &subqueries { + alias_to_index.insert(alias.clone(), idx); } all_subqueries.extend(subqueries); - expr_to_rewrite_expr_map.insert(expr, rewrite_exprs); + rewrite_exprs.push(rewrite_expr); } assert_or_internal_err!( !all_subqueries.is_empty(), @@ -180,10 +177,9 @@ impl OptimizerRule for ScalarSubqueryToJoin { { cur_input = optimized_subquery; if !expr_check_map.is_empty() - && let Some(expr) = subquery_to_expr_map.get(&subquery) - && let Some(rewrite_expr) = expr_to_rewrite_expr_map.get(expr) + && let Some(&idx) = alias_to_index.get(&alias) { - let new_expr = rewrite_expr + let new_expr = rewrite_exprs[idx] .clone() .transform_up(|expr| { // replace column references with entry in map, if it exists @@ -197,7 +193,7 @@ impl OptimizerRule for ScalarSubqueryToJoin { } }) .data()?; - expr_to_rewrite_expr_map.insert(expr, new_expr); + rewrite_exprs[idx] = new_expr; } } else { // if we can't handle all of the subqueries then bail for now @@ -206,14 +202,13 @@ impl OptimizerRule for ScalarSubqueryToJoin { } let mut proj_exprs = vec![]; - for expr in projection.expr.iter() { + for (expr, new_expr) in projection.expr.iter().zip(rewrite_exprs) { let old_expr_name = expr.schema_name().to_string(); - let new_expr = expr_to_rewrite_expr_map.get(expr).unwrap(); let new_expr_name = new_expr.schema_name().to_string(); if new_expr_name != old_expr_name { - proj_exprs.push(new_expr.clone().alias(old_expr_name)) + proj_exprs.push(new_expr.alias(old_expr_name)) } else { - proj_exprs.push(new_expr.clone()); + proj_exprs.push(new_expr); } } let new_plan = LogicalPlanBuilder::from(cur_input) diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index f56a8a10d2e6..47155386e68e 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -851,6 +851,22 @@ SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) from t1 44 0 +#correlated_scalar_subquery_count_agg_duplicated_in_projection +# Two structurally identical correlated COUNT subqueries in the same +# projection must each receive the count-bug compensation, so unmatched +# outer rows produce 0 (not NULL) on both sides. +query III rowsort +SELECT + t1_id, + (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) + 1 AS a, + (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) + 2 AS b +FROM t1 +---- +11 2 3 +22 1 2 +33 4 5 +44 1 2 + #correlated_scalar_subquery_count_agg2 query TT explain SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) as cnt from t1