Skip to content
72 changes: 71 additions & 1 deletion datafusion/physical-expr/src/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,13 @@ mod tests {
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::{DFSchema, assert_contains};
use datafusion_common::stats::Precision;
use datafusion_common::{DFSchema, Result, ScalarValue, assert_contains};
use datafusion_expr::{
Expr, col, execution_props::ExecutionProps, interval_arithmetic::Interval, lit,
};

use crate::expressions::Column;
use crate::{AnalysisContext, create_physical_expr};

use super::{ExprBoundaries, analyze};
Expand Down Expand Up @@ -435,4 +437,72 @@ mod tests {
.unwrap_err();
assert_contains!(analysis_error.to_string(), expected_error);
}

#[test]
fn analyze_not_eq_around() -> Result<()> {
let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Float32)]));
let boundaries = vec![ExprBoundaries {
column: Column::new("a", 0),
interval: Some(Interval::try_new(
ScalarValue::Float32(Some(-1.0)),
ScalarValue::Float32(Some(1.0)),
)?),
distinct_count: Precision::Absent,
}];

// NOT (a = 0.0)
let pred_not_eq = datafusion_expr::not(col("a").eq(lit(0.0f32)));

let df_schema = DFSchema::try_from(Arc::clone(&schema))?;
let physical_expr =
create_physical_expr(&pred_not_eq, &df_schema, &ExecutionProps::new())?;

let out_not_eq = analyze(
&physical_expr,
AnalysisContext::new(boundaries),
df_schema.as_ref(),
)?;

let actual = out_not_eq.boundaries[0].interval.clone();
let expected = Some(Interval::try_new(
ScalarValue::Float32(Some(-1.0)),
ScalarValue::Float32(Some(1.0)),
)?);

assert_eq!(expected, actual);
Ok(())
}

#[test]
fn analyze_not_eq_infeasible() -> Result<()> {
let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Float32)]));
let boundaries = vec![ExprBoundaries {
column: Column::new("a", 0),
interval: Some(Interval::try_new(
ScalarValue::Float32(Some(0.0)),
ScalarValue::Float32(Some(0.0)),
)?),
distinct_count: Precision::Absent,
}];

// NOT (a = 0.0) -> Infeasible when a is known to be 0.0
let pred_not_eq = datafusion_expr::not(col("a").eq(lit(0.0f32)));

let df_schema = DFSchema::try_from(Arc::clone(&schema))?;
let physical_expr =
create_physical_expr(&pred_not_eq, &df_schema, &ExecutionProps::new())?;

let out_not_eq = analyze(
&physical_expr,
AnalysisContext::new(boundaries),
df_schema.as_ref(),
)?;

// Should be infeasible (selectivity 0.0 and None intervals)
assert_eq!(out_not_eq.selectivity, Some(0.0));
for boundary in out_not_eq.boundaries {
assert!(boundary.interval.is_none());
}
Ok(())
}
}
13 changes: 11 additions & 2 deletions datafusion/physical-expr/src/intervals/cp_solver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,17 @@ pub fn propagate_comparison(
} else if parent == &Interval::FALSE {
match op {
Operator::Eq => {
// TODO: Propagation is not possible until we support interval sets.
Ok(None)
// If the intervals are the same and single points, then equality
// is certain. Thus, inequality is impossible.
if !left_child.is_unbounded()
&& left_child == right_child
&& left_child.lower() == left_child.upper()
{
Ok(None)
} else {
// TODO: Propagation is not possible until we support interval sets.
Ok(Some((left_child.clone(), right_child.clone())))
}
}
Operator::Gt => satisfy_greater(right_child, left_child, false),
Operator::GtEq => satisfy_greater(right_child, left_child, true),
Expand Down