diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index da18d9071869..208a6a77ade3 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -21,9 +21,10 @@ use std::sync::Arc; use datafusion_common::JoinType; use datafusion_common::tree_node::Transformed; -use datafusion_common::{Result, plan_err}; +use datafusion_common::{Column, DFSchemaRef, Result, ScalarValue, plan_err}; +use datafusion_expr::expr::Cast; use datafusion_expr::logical_plan::LogicalPlan; -use datafusion_expr::{EmptyRelation, Projection, Union}; +use datafusion_expr::{EmptyRelation, Expr, Projection, Union}; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; @@ -73,12 +74,8 @@ impl OptimizerRule for PropagateEmptyRelation { Ok(Transformed::no(plan)) } LogicalPlan::Join(ref join) => { - // TODO: For Join, more join type need to be careful: - // For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side - // columns + right side columns replaced with null values. - // For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side - // columns + left side columns replaced with null values. let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?; + let left_field_count = join.left.schema().fields().len(); match join.join_type { // For Full Join, only both sides are empty, the Join result is empty. @@ -88,6 +85,24 @@ impl OptimizerRule for PropagateEmptyRelation { schema: Arc::clone(&join.schema), }), )), + // For Full Join, if one side is empty, replace with a + // Projection that null-pads the empty side's columns. + JoinType::Full if right_empty => { + Ok(Transformed::yes(build_null_padded_projection( + Arc::clone(&join.left), + &join.schema, + left_field_count, + true, + )?)) + } + JoinType::Full if left_empty => { + Ok(Transformed::yes(build_null_padded_projection( + Arc::clone(&join.right), + &join.schema, + left_field_count, + false, + )?)) + } JoinType::Inner if left_empty || right_empty => Ok(Transformed::yes( LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, @@ -100,12 +115,32 @@ impl OptimizerRule for PropagateEmptyRelation { schema: Arc::clone(&join.schema), }), )), + // Left Join with empty right: all left rows survive + // with NULLs for right columns. + JoinType::Left if right_empty => { + Ok(Transformed::yes(build_null_padded_projection( + Arc::clone(&join.left), + &join.schema, + left_field_count, + true, + )?)) + } JoinType::Right if right_empty => Ok(Transformed::yes( LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, schema: Arc::clone(&join.schema), }), )), + // Right Join with empty left: all right rows survive + // with NULLs for left columns. + JoinType::Right if left_empty => { + Ok(Transformed::yes(build_null_padded_projection( + Arc::clone(&join.right), + &join.schema, + left_field_count, + false, + )?)) + } JoinType::LeftSemi if left_empty || right_empty => Ok( Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, @@ -230,6 +265,60 @@ fn empty_child(plan: &LogicalPlan) -> Result> { } } +/// Builds a Projection that replaces one side of an outer join with NULL literals. +/// +/// When one side of an outer join is an `EmptyRelation`, the join can be eliminated +/// by projecting the surviving side's columns as-is and replacing the empty side's +/// columns with `CAST(NULL AS )`. +/// +/// The join schema is used as the projection's output schema to preserve nullability +/// guarantees (important for FULL JOIN where the surviving side's columns are marked +/// nullable in the join schema even if they aren't in the source schema). +/// +/// # Example +/// +/// For a `LEFT JOIN` where the right side is empty: +/// ```text +/// Left Join (orders.id = returns.order_id) Projection(orders.id, orders.amount, +/// ├── TableScan: orders => CAST(NULL AS Int64) AS order_id, +/// └── EmptyRelation CAST(NULL AS Utf8) AS reason) +/// └── TableScan: orders +/// ``` +fn build_null_padded_projection( + surviving_plan: Arc, + join_schema: &DFSchemaRef, + left_field_count: usize, + empty_side_is_right: bool, +) -> Result { + let exprs = join_schema + .iter() + .enumerate() + .map(|(i, (qualifier, field))| { + let on_empty_side = if empty_side_is_right { + i >= left_field_count + } else { + i < left_field_count + }; + + if on_empty_side { + Expr::Cast(Cast::new( + Box::new(Expr::Literal(ScalarValue::Null, None)), + field.data_type().clone(), + )) + .alias_qualified(qualifier.cloned(), field.name()) + } else { + Expr::Column(Column::new(qualifier.cloned(), field.name())) + } + }) + .collect::>(); + + Ok(LogicalPlan::Projection(Projection::try_new_with_schema( + exprs, + surviving_plan, + Arc::clone(join_schema), + )?)) +} + #[cfg(test)] mod tests { @@ -570,6 +659,86 @@ mod tests { assert_empty_left_empty_right_lp(true, false, JoinType::RightAnti, false) } + #[test] + fn test_left_join_right_empty_null_pad() -> Result<()> { + let left = + LogicalPlanBuilder::from(test_table_scan_with_name("left")?).build()?; + let right_empty = LogicalPlanBuilder::from(test_table_scan_with_name("right")?) + .filter(lit(false))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .join_using( + right_empty, + JoinType::Left, + vec![Column::from_name("a".to_string())], + )? + .build()?; + + let expected = "Projection: left.a, left.b, left.c, CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c\n TableScan: left"; + assert_together_optimized_plan(plan, expected, true) + } + + #[test] + fn test_right_join_left_empty_null_pad() -> Result<()> { + let left_empty = LogicalPlanBuilder::from(test_table_scan_with_name("left")?) + .filter(lit(false))? + .build()?; + let right = + LogicalPlanBuilder::from(test_table_scan_with_name("right")?).build()?; + + let plan = LogicalPlanBuilder::from(left_empty) + .join_using( + right, + JoinType::Right, + vec![Column::from_name("a".to_string())], + )? + .build()?; + + let expected = "Projection: CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c, right.a, right.b, right.c\n TableScan: right"; + assert_together_optimized_plan(plan, expected, true) + } + + #[test] + fn test_full_join_right_empty_null_pad() -> Result<()> { + let left = + LogicalPlanBuilder::from(test_table_scan_with_name("left")?).build()?; + let right_empty = LogicalPlanBuilder::from(test_table_scan_with_name("right")?) + .filter(lit(false))? + .build()?; + + let plan = LogicalPlanBuilder::from(left) + .join_using( + right_empty, + JoinType::Full, + vec![Column::from_name("a".to_string())], + )? + .build()?; + + let expected = "Projection: left.a, left.b, left.c, CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c\n TableScan: left"; + assert_together_optimized_plan(plan, expected, true) + } + + #[test] + fn test_full_join_left_empty_null_pad() -> Result<()> { + let left_empty = LogicalPlanBuilder::from(test_table_scan_with_name("left")?) + .filter(lit(false))? + .build()?; + let right = + LogicalPlanBuilder::from(test_table_scan_with_name("right")?).build()?; + + let plan = LogicalPlanBuilder::from(left_empty) + .join_using( + right, + JoinType::Full, + vec![Column::from_name("a".to_string())], + )? + .build()?; + + let expected = "Projection: CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c, right.a, right.b, right.c\n TableScan: right"; + assert_together_optimized_plan(plan, expected, true) + } + #[test] fn test_empty_with_non_empty() -> Result<()> { let table_scan = test_table_scan()?; diff --git a/datafusion/sqllogictest/test_files/propagate_empty_relation_outer_join.slt b/datafusion/sqllogictest/test_files/propagate_empty_relation_outer_join.slt new file mode 100644 index 000000000000..afc560baf0de --- /dev/null +++ b/datafusion/sqllogictest/test_files/propagate_empty_relation_outer_join.slt @@ -0,0 +1,137 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test PropagateEmptyRelation rule: outer joins where one side is +# an EmptyRelation should be replaced with a null-padded projection. + +statement ok +create table t1(a int, b varchar, c double); + +statement ok +create table t2(x int, y varchar, z double); + +statement ok +insert into t1 values (1, 'a', 10.0), (2, 'b', 20.0), (3, 'c', 30.0); + +statement ok +insert into t2 values (1, 'p', 100.0), (2, 'q', 200.0); + +statement ok +set datafusion.explain.logical_plan_only = true; + +### +### LEFT JOIN with empty right (WHERE false subquery) +### + +# The join should be eliminated — no join operator in the plan +query TT +explain select * from t1 left join (select * from t2 where false) r on t1.a = r.x; +---- +logical_plan +01)Projection: t1.a, t1.b, t1.c, Int32(NULL) AS x, Utf8View(NULL) AS y, Float64(NULL) AS z +02)--TableScan: t1 projection=[a, b, c] + +# Verify result correctness — all left rows with NULLs on right +query ITRITR rowsort +select * from t1 left join (select * from t2 where false) r on t1.a = r.x; +---- +1 a 10 NULL NULL NULL +2 b 20 NULL NULL NULL +3 c 30 NULL NULL NULL + +### +### RIGHT JOIN with empty left +### + +query TT +explain select * from (select * from t1 where false) l right join t2 on l.a = t2.x; +---- +logical_plan +01)Projection: Int32(NULL) AS a, Utf8View(NULL) AS b, Float64(NULL) AS c, t2.x, t2.y, t2.z +02)--TableScan: t2 projection=[x, y, z] + +query ITRITR rowsort +select * from (select * from t1 where false) l right join t2 on l.a = t2.x; +---- +NULL NULL NULL 1 p 100 +NULL NULL NULL 2 q 200 + +### +### FULL JOIN with empty right +### + +query TT +explain select * from t1 full join (select * from t2 where false) r on t1.a = r.x; +---- +logical_plan +01)Projection: t1.a, t1.b, t1.c, Int32(NULL) AS x, Utf8View(NULL) AS y, Float64(NULL) AS z +02)--TableScan: t1 projection=[a, b, c] + +query ITRITR rowsort +select * from t1 full join (select * from t2 where false) r on t1.a = r.x; +---- +1 a 10 NULL NULL NULL +2 b 20 NULL NULL NULL +3 c 30 NULL NULL NULL + +### +### FULL JOIN with empty left +### + +query TT +explain select * from (select * from t1 where false) l full join t2 on l.a = t2.x; +---- +logical_plan +01)Projection: Int32(NULL) AS a, Utf8View(NULL) AS b, Float64(NULL) AS c, t2.x, t2.y, t2.z +02)--TableScan: t2 projection=[x, y, z] + +query ITRITR rowsort +select * from (select * from t1 where false) l full join t2 on l.a = t2.x; +---- +NULL NULL NULL 1 p 100 +NULL NULL NULL 2 q 200 + +### +### Filter on top of optimized join +### + +query TT +explain select * from t1 left join (select * from t2 where false) r on t1.a = r.x where t1.a > 1; +---- +logical_plan +01)Projection: t1.a, t1.b, t1.c, Int32(NULL) AS x, Utf8View(NULL) AS y, Float64(NULL) AS z +02)--Filter: t1.a > Int32(1) +03)----TableScan: t1 projection=[a, b, c] + +query ITRITR rowsort +select * from t1 left join (select * from t2 where false) r on t1.a = r.x where t1.a > 1; +---- +2 b 20 NULL NULL NULL +3 c 30 NULL NULL NULL + +### +### Cleanup +### + +statement ok +set datafusion.explain.logical_plan_only = false; + +statement ok +drop table t1; + +statement ok +drop table t2; diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index 7f88199b3c0e..41746bc38297 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -689,10 +689,8 @@ query TT explain SELECT t1_id, (SELECT t2_id FROM t2 limit 0) FROM t1 ---- logical_plan -01)Projection: t1.t1_id, __scalar_sq_1.t2_id AS t2_id -02)--Left Join: -03)----TableScan: t1 projection=[t1_id] -04)----EmptyRelation: rows=0 +01)Projection: t1.t1_id, Int32(NULL) AS t2_id +02)--TableScan: t1 projection=[t1_id] query II rowsort SELECT t1_id, (SELECT t2_id FROM t2 limit 0) FROM t1