Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 176 additions & 7 deletions datafusion/optimizer/src/propagate_empty_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ use std::sync::Arc;

use datafusion_common::JoinType;
use datafusion_common::tree_node::Transformed;
use datafusion_common::{Result, plan_err};
use datafusion_common::{Column, DFSchemaRef, Result, ScalarValue, plan_err};
use datafusion_expr::expr::Cast;
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::{EmptyRelation, Projection, Union};
use datafusion_expr::{EmptyRelation, Expr, Projection, Union};

use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
Expand Down Expand Up @@ -73,12 +74,8 @@ impl OptimizerRule for PropagateEmptyRelation {
Ok(Transformed::no(plan))
}
LogicalPlan::Join(ref join) => {
// TODO: For Join, more join type need to be careful:
// For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side
// columns + right side columns replaced with null values.
// For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side
// columns + left side columns replaced with null values.
let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
let left_field_count = join.left.schema().fields().len();

match join.join_type {
// For Full Join, only both sides are empty, the Join result is empty.
Expand All @@ -88,6 +85,24 @@ impl OptimizerRule for PropagateEmptyRelation {
schema: Arc::clone(&join.schema),
}),
)),
// For Full Join, if one side is empty, replace with a
// Projection that null-pads the empty side's columns.
JoinType::Full if right_empty => {
Ok(Transformed::yes(build_null_padded_projection(
Arc::clone(&join.left),
&join.schema,
left_field_count,
true,
)?))
}
JoinType::Full if left_empty => {
Ok(Transformed::yes(build_null_padded_projection(
Arc::clone(&join.right),
&join.schema,
left_field_count,
false,
)?))
}
JoinType::Inner if left_empty || right_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
Expand All @@ -100,12 +115,32 @@ impl OptimizerRule for PropagateEmptyRelation {
schema: Arc::clone(&join.schema),
}),
)),
// Left Join with empty right: all left rows survive
// with NULLs for right columns.
JoinType::Left if right_empty => {
Ok(Transformed::yes(build_null_padded_projection(
Arc::clone(&join.left),
&join.schema,
left_field_count,
true,
)?))
}
JoinType::Right if right_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::clone(&join.schema),
}),
)),
// Right Join with empty left: all right rows survive
// with NULLs for left columns.
JoinType::Right if left_empty => {
Ok(Transformed::yes(build_null_padded_projection(
Arc::clone(&join.right),
&join.schema,
left_field_count,
false,
)?))
}
JoinType::LeftSemi if left_empty || right_empty => Ok(
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
Expand Down Expand Up @@ -230,6 +265,60 @@ fn empty_child(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
}
}

/// Builds a Projection that replaces one side of an outer join with NULL literals.
///
/// When one side of an outer join is an `EmptyRelation`, the join can be eliminated
/// by projecting the surviving side's columns as-is and replacing the empty side's
/// columns with `CAST(NULL AS <type>)`.
///
/// The join schema is used as the projection's output schema to preserve nullability
/// guarantees (important for FULL JOIN where the surviving side's columns are marked
/// nullable in the join schema even if they aren't in the source schema).
///
/// # Example
///
/// For a `LEFT JOIN` where the right side is empty:
/// ```text
/// Left Join (orders.id = returns.order_id) Projection(orders.id, orders.amount,
/// ├── TableScan: orders => CAST(NULL AS Int64) AS order_id,
/// └── EmptyRelation CAST(NULL AS Utf8) AS reason)
/// └── TableScan: orders
/// ```
fn build_null_padded_projection(
surviving_plan: Arc<LogicalPlan>,
join_schema: &DFSchemaRef,
left_field_count: usize,
empty_side_is_right: bool,
) -> Result<LogicalPlan> {
let exprs = join_schema
.iter()
.enumerate()
.map(|(i, (qualifier, field))| {
let on_empty_side = if empty_side_is_right {
i >= left_field_count
} else {
i < left_field_count
};

if on_empty_side {
Expr::Cast(Cast::new(
Box::new(Expr::Literal(ScalarValue::Null, None)),
field.data_type().clone(),
))
.alias_qualified(qualifier.cloned(), field.name())
} else {
Expr::Column(Column::new(qualifier.cloned(), field.name()))
}
})
.collect::<Vec<_>>();

Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
exprs,
surviving_plan,
Arc::clone(join_schema),
)?))
}

#[cfg(test)]
mod tests {

Expand Down Expand Up @@ -570,6 +659,86 @@ mod tests {
assert_empty_left_empty_right_lp(true, false, JoinType::RightAnti, false)
}

#[test]
fn test_left_join_right_empty_null_pad() -> Result<()> {
let left =
LogicalPlanBuilder::from(test_table_scan_with_name("left")?).build()?;
let right_empty = LogicalPlanBuilder::from(test_table_scan_with_name("right")?)
.filter(lit(false))?
.build()?;

let plan = LogicalPlanBuilder::from(left)
.join_using(
right_empty,
JoinType::Left,
vec![Column::from_name("a".to_string())],
)?
.build()?;

let expected = "Projection: left.a, left.b, left.c, CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c\n TableScan: left";
assert_together_optimized_plan(plan, expected, true)
}

#[test]
fn test_right_join_left_empty_null_pad() -> Result<()> {
let left_empty = LogicalPlanBuilder::from(test_table_scan_with_name("left")?)
.filter(lit(false))?
.build()?;
let right =
LogicalPlanBuilder::from(test_table_scan_with_name("right")?).build()?;

let plan = LogicalPlanBuilder::from(left_empty)
.join_using(
right,
JoinType::Right,
vec![Column::from_name("a".to_string())],
)?
.build()?;

let expected = "Projection: CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c, right.a, right.b, right.c\n TableScan: right";
assert_together_optimized_plan(plan, expected, true)
}

#[test]
fn test_full_join_right_empty_null_pad() -> Result<()> {
let left =
LogicalPlanBuilder::from(test_table_scan_with_name("left")?).build()?;
let right_empty = LogicalPlanBuilder::from(test_table_scan_with_name("right")?)
.filter(lit(false))?
.build()?;

let plan = LogicalPlanBuilder::from(left)
.join_using(
right_empty,
JoinType::Full,
vec![Column::from_name("a".to_string())],
)?
.build()?;

let expected = "Projection: left.a, left.b, left.c, CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c\n TableScan: left";
assert_together_optimized_plan(plan, expected, true)
}

#[test]
fn test_full_join_left_empty_null_pad() -> Result<()> {
let left_empty = LogicalPlanBuilder::from(test_table_scan_with_name("left")?)
.filter(lit(false))?
.build()?;
let right =
LogicalPlanBuilder::from(test_table_scan_with_name("right")?).build()?;

let plan = LogicalPlanBuilder::from(left_empty)
.join_using(
right,
JoinType::Full,
vec![Column::from_name("a".to_string())],
)?
.build()?;

let expected = "Projection: CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c, right.a, right.b, right.c\n TableScan: right";
assert_together_optimized_plan(plan, expected, true)
}

#[test]
fn test_empty_with_non_empty() -> Result<()> {
let table_scan = test_table_scan()?;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Test PropagateEmptyRelation rule: outer joins where one side is
# an EmptyRelation should be replaced with a null-padded projection.

statement ok
create table t1(a int, b varchar, c double);

statement ok
create table t2(x int, y varchar, z double);

statement ok
insert into t1 values (1, 'a', 10.0), (2, 'b', 20.0), (3, 'c', 30.0);

statement ok
insert into t2 values (1, 'p', 100.0), (2, 'q', 200.0);

statement ok
set datafusion.explain.logical_plan_only = true;

###
### LEFT JOIN with empty right (WHERE false subquery)
###

# The join should be eliminated — no join operator in the plan
query TT
explain select * from t1 left join (select * from t2 where false) r on t1.a = r.x;
----
logical_plan
01)Projection: t1.a, t1.b, t1.c, Int32(NULL) AS x, Utf8View(NULL) AS y, Float64(NULL) AS z
02)--TableScan: t1 projection=[a, b, c]

# Verify result correctness — all left rows with NULLs on right
query ITRITR rowsort
select * from t1 left join (select * from t2 where false) r on t1.a = r.x;
----
1 a 10 NULL NULL NULL
2 b 20 NULL NULL NULL
3 c 30 NULL NULL NULL

###
### RIGHT JOIN with empty left
###

query TT
explain select * from (select * from t1 where false) l right join t2 on l.a = t2.x;
----
logical_plan
01)Projection: Int32(NULL) AS a, Utf8View(NULL) AS b, Float64(NULL) AS c, t2.x, t2.y, t2.z
02)--TableScan: t2 projection=[x, y, z]

query ITRITR rowsort
select * from (select * from t1 where false) l right join t2 on l.a = t2.x;
----
NULL NULL NULL 1 p 100
NULL NULL NULL 2 q 200

###
### FULL JOIN with empty right
###

query TT
explain select * from t1 full join (select * from t2 where false) r on t1.a = r.x;
----
logical_plan
01)Projection: t1.a, t1.b, t1.c, Int32(NULL) AS x, Utf8View(NULL) AS y, Float64(NULL) AS z
02)--TableScan: t1 projection=[a, b, c]

query ITRITR rowsort
select * from t1 full join (select * from t2 where false) r on t1.a = r.x;
----
1 a 10 NULL NULL NULL
2 b 20 NULL NULL NULL
3 c 30 NULL NULL NULL

###
### FULL JOIN with empty left
###

query TT
explain select * from (select * from t1 where false) l full join t2 on l.a = t2.x;
----
logical_plan
01)Projection: Int32(NULL) AS a, Utf8View(NULL) AS b, Float64(NULL) AS c, t2.x, t2.y, t2.z
02)--TableScan: t2 projection=[x, y, z]

query ITRITR rowsort
select * from (select * from t1 where false) l full join t2 on l.a = t2.x;
----
NULL NULL NULL 1 p 100
NULL NULL NULL 2 q 200

###
### Filter on top of optimized join
###

query TT
explain select * from t1 left join (select * from t2 where false) r on t1.a = r.x where t1.a > 1;
----
logical_plan
01)Projection: t1.a, t1.b, t1.c, Int32(NULL) AS x, Utf8View(NULL) AS y, Float64(NULL) AS z
02)--Filter: t1.a > Int32(1)
03)----TableScan: t1 projection=[a, b, c]

query ITRITR rowsort
select * from t1 left join (select * from t2 where false) r on t1.a = r.x where t1.a > 1;
----
2 b 20 NULL NULL NULL
3 c 30 NULL NULL NULL

###
### Cleanup
###

statement ok
set datafusion.explain.logical_plan_only = false;

statement ok
drop table t1;

statement ok
drop table t2;
6 changes: 2 additions & 4 deletions datafusion/sqllogictest/test_files/subquery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -689,10 +689,8 @@ query TT
explain SELECT t1_id, (SELECT t2_id FROM t2 limit 0) FROM t1
----
logical_plan
01)Projection: t1.t1_id, __scalar_sq_1.t2_id AS t2_id
02)--Left Join:
03)----TableScan: t1 projection=[t1_id]
04)----EmptyRelation: rows=0
01)Projection: t1.t1_id, Int32(NULL) AS t2_id
02)--TableScan: t1 projection=[t1_id]

query II rowsort
SELECT t1_id, (SELECT t2_id FROM t2 limit 0) FROM t1
Expand Down
Loading