diff --git a/datafusion/physical-expr-common/src/datum.rs b/datafusion/physical-expr-common/src/datum.rs index bd5790507f662..6098e938091ea 100644 --- a/datafusion/physical-expr-common/src/datum.rs +++ b/datafusion/physical-expr-common/src/datum.rs @@ -15,8 +15,12 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::ArrowNativeTypeOp; use arrow::array::BooleanArray; -use arrow::array::{ArrayRef, Datum, make_comparator}; +use arrow::array::types::*; +use arrow::array::{ + Array, ArrayRef, ArrowPrimitiveType, Datum, PrimitiveArray, make_comparator, +}; use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::compute::kernels::cmp::{ distinct, eq, gt, gt_eq, lt, lt_eq, neq, not_distinct, @@ -55,6 +59,437 @@ pub fn apply( } } +/// Arithmetic operations that can be applied in-place on primitive arrays. +#[derive(Debug, Copy, Clone)] +pub enum ArithmeticOp { + Add, + AddWrapping, + Sub, + SubWrapping, + Mul, + MulWrapping, + Div, + Rem, +} + +/// Like [`apply`], but takes ownership of `ColumnarValue` inputs to enable +/// in-place buffer reuse for arithmetic on primitive arrays. +/// +/// When the left operand is a primitive array whose underlying buffer has a +/// reference count of 1 (i.e. no other consumers), the arithmetic is performed +/// in-place using [`PrimitiveArray::unary_mut`] or [`PrimitiveArray::try_unary_mut`], +/// avoiding a buffer allocation. If in-place mutation is not possible (shared +/// buffer, non-primitive type, etc.) this falls back to the standard Arrow +/// compute kernel. +pub fn apply_arithmetic( + lhs: ColumnarValue, + rhs: ColumnarValue, + op: ArithmeticOp, +) -> Result { + let f = arithmetic_op_to_fn(op); + match (lhs, rhs) { + (ColumnarValue::Array(left), ColumnarValue::Array(right)) => { + // Try in-place on left array, then right array + match try_apply_inplace_array(left, &right, op) { + Ok(result) => Ok(ColumnarValue::Array(result)), + Err(left) => match try_apply_inplace_array_rhs(&left, right, op) { + Ok(result) => Ok(ColumnarValue::Array(result)), + Err(right) => Ok(ColumnarValue::Array(f(&left, &right)?)), + }, + } + } + (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => { + // Try in-place on right array with scalar left (flipped) + match try_apply_inplace_scalar_rhs(right, &left, op) { + Ok(result) => Ok(ColumnarValue::Array(result)), + Err(right) => Ok(ColumnarValue::Array(f(&left.to_scalar()?, &right)?)), + } + } + (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => { + // Try in-place on left array with scalar right + match try_apply_inplace_scalar(left, &right, op) { + Ok(result) => Ok(ColumnarValue::Array(result)), + Err(left) => Ok(ColumnarValue::Array(f(&left, &right.to_scalar()?)?)), + } + } + (ColumnarValue::Scalar(left), ColumnarValue::Scalar(right)) => { + let array = f(&left.to_scalar()?, &right.to_scalar()?)?; + let scalar = ScalarValue::try_from_array(array.as_ref(), 0)?; + Ok(ColumnarValue::Scalar(scalar)) + } + } +} + +fn arithmetic_op_to_fn( + op: ArithmeticOp, +) -> fn(&dyn Datum, &dyn Datum) -> Result { + use arrow::compute::kernels::numeric::*; + match op { + ArithmeticOp::Add => add, + ArithmeticOp::AddWrapping => add_wrapping, + ArithmeticOp::Sub => sub, + ArithmeticOp::SubWrapping => sub_wrapping, + ArithmeticOp::Mul => mul, + ArithmeticOp::MulWrapping => mul_wrapping, + ArithmeticOp::Div => div, + ArithmeticOp::Rem => rem, + } +} + +/// Dispatches an in-place unary (array op scalar) operation across all supported primitive types. +/// `$arr` is the ArrayRef, `$scalar_value` is the &ScalarValue, `$op` is ArithmeticOp, +/// `$fn_name` is the function to call (try_inplace_unary or try_inplace_unary_rhs). +macro_rules! dispatch_inplace_unary { + ($arr:expr, $scalar_value:expr, $op:expr, $fn_name:ident) => {{ + macro_rules! do_dispatch { + ($arrow_type:ty, $arr_inner:expr) => {{ + let scalar_val = $scalar_value + .to_scalar() + .map_err(|_| Arc::clone(&$arr_inner))?; + let scalar_arr = scalar_val.get().0; + let rhs_val = scalar_arr + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&$arr_inner))? + .value(0); + $fn_name::<$arrow_type>($arr_inner, rhs_val, $op) + }}; + } + match $arr.data_type() { + dt if dt == &::DATA_TYPE => { + do_dispatch!(Int8Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(Int16Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(Int32Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(Int64Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(UInt8Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(UInt16Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(UInt32Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(UInt64Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(Float16Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(Float32Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(Float64Type, $arr) + } + // Decimal types excluded: result precision/scale differs from input + _ => Err($arr), + } + }}; +} + +/// Dispatches an in-place binary (array op array) operation across all supported primitive types. +/// `$arr` is the ArrayRef to mutate, `$other` is the other ArrayRef, `$op` is ArithmeticOp, +/// `$fn_name` is the function to call (try_inplace_binary or try_inplace_binary_rhs). +macro_rules! dispatch_inplace_binary { + ($arr:expr, $other:expr, $op:expr, $fn_name:ident) => {{ + match $arr.data_type() { + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + // Decimal types excluded: result precision/scale differs from input + _ => Err($arr), + } + }}; +} + +/// Try to apply arithmetic in-place on `left` array with a scalar `right`. +/// Returns `Ok(result)` on success, or `Err(left)` if in-place not possible. +fn try_apply_inplace_scalar( + left: ArrayRef, + right: &ScalarValue, + op: ArithmeticOp, +) -> Result { + if right.is_null() { + return Err(left); + } + dispatch_inplace_unary!(left, right, op, try_inplace_unary) +} + +/// Try to apply arithmetic in-place on `left` array using values from `right` array. +/// Returns `Ok(result)` on success, or `Err(left)` if in-place not possible. +fn try_apply_inplace_array( + left: ArrayRef, + right: &ArrayRef, + op: ArithmeticOp, +) -> Result { + if left.data_type() != right.data_type() { + return Err(left); + } + dispatch_inplace_binary!(left, right, op, try_inplace_binary) +} + +/// Attempt in-place unary (array op scalar) mutation on a PrimitiveArray. +fn try_inplace_unary( + array: ArrayRef, + scalar: T::Native, + op: ArithmeticOp, +) -> Result +where + T::Native: ArrowNativeTypeOp, +{ + // Clone the PrimitiveArray (cheap — shares the buffer via Arc) + let primitive = array + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&array))? + .clone(); + // Drop the ArrayRef so the buffer's refcount can drop to 1 + drop(array); + + // Only attempt in-place for wrapping (infallible) operations. + // Checked ops (Add, Sub, Mul, Div) can fail mid-way, corrupting the buffer. + // Rem with zero divisor must also fall back for proper error reporting. + type BinFn = fn(N, N) -> N; + let op_fn: Option> = match op { + ArithmeticOp::AddWrapping => Some(ArrowNativeTypeOp::add_wrapping), + ArithmeticOp::SubWrapping => Some(ArrowNativeTypeOp::sub_wrapping), + ArithmeticOp::MulWrapping => Some(ArrowNativeTypeOp::mul_wrapping), + ArithmeticOp::Rem if !scalar.is_zero() => Some(ArrowNativeTypeOp::mod_wrapping), + _ => None, + }; + + let Some(op_fn) = op_fn else { + return Err(Arc::new(primitive)); + }; + + match primitive.unary_mut(|v| op_fn(v, scalar)) { + Ok(result) => Ok(Arc::new(result)), + Err(arr) => Err(Arc::new(arr)), + } +} + +/// Attempt in-place binary (array op array) mutation on a PrimitiveArray. +fn try_inplace_binary( + left: ArrayRef, + right: &ArrayRef, + op: ArithmeticOp, +) -> Result +where + T::Native: ArrowNativeTypeOp, +{ + let right_primitive = right + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&left))?; + + let left_primitive = left + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&left))? + .clone(); + drop(left); + + let mut builder = match left_primitive.into_builder() { + Ok(b) => b, + Err(arr) => return Err(Arc::new(arr)), + }; + + // Only attempt in-place for wrapping (infallible) operations. + type BinFn = fn(N, N) -> N; + let op_fn: Option> = match op { + ArithmeticOp::AddWrapping => Some(ArrowNativeTypeOp::add_wrapping), + ArithmeticOp::SubWrapping => Some(ArrowNativeTypeOp::sub_wrapping), + ArithmeticOp::MulWrapping => Some(ArrowNativeTypeOp::mul_wrapping), + _ => None, + }; + + let Some(op_fn) = op_fn else { + return Err(Arc::new(builder.finish())); + }; + + let left_slice = builder.values_slice_mut(); + let right_values = right_primitive.values(); + + left_slice + .iter_mut() + .zip(right_values.iter()) + .for_each(|(l, r)| *l = op_fn(*l, *r)); + + // Merge null buffers from both sides + let result = builder.finish(); + if right_primitive.nulls().is_some() { + let merged = NullBuffer::union(result.nulls(), right_primitive.nulls()); + let result = PrimitiveArray::::new(result.values().clone(), merged); + Ok(Arc::new(result)) + } else { + Ok(Arc::new(result)) + } +} + +/// Try to apply arithmetic in-place on `right` array with a scalar `left`. +/// The operation is `result\[i\] = op(scalar_left, right\[i\])`, stored in `right`'s buffer. +/// Returns `Ok(result)` on success, or `Err(right)` if in-place not possible. +fn try_apply_inplace_scalar_rhs( + right: ArrayRef, + left: &ScalarValue, + op: ArithmeticOp, +) -> Result { + if left.is_null() { + return Err(right); + } + dispatch_inplace_unary!(right, left, op, try_inplace_unary_rhs) +} + +/// Try to apply arithmetic in-place on `right` array using values from `left` array. +/// The operation is `result\[i\] = op(left\[i\], right\[i\])`, stored in `right`'s buffer. +/// Returns `Ok(result)` on success, or `Err(right)` if in-place not possible. +fn try_apply_inplace_array_rhs( + left: &ArrayRef, + right: ArrayRef, + op: ArithmeticOp, +) -> Result { + if left.data_type() != right.data_type() { + return Err(right); + } + dispatch_inplace_binary!(right, left, op, try_inplace_binary_rhs) +} + +/// Attempt in-place mutation on the right PrimitiveArray: result\[i\] = op(scalar, right\[i\]). +fn try_inplace_unary_rhs( + array: ArrayRef, + scalar: T::Native, + op: ArithmeticOp, +) -> Result +where + T::Native: ArrowNativeTypeOp, +{ + let primitive = array + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&array))? + .clone(); + drop(array); + + // For right-side mutation: result = op(scalar, element) + // Commutative ops: same as op(element, scalar) + // Non-commutative: need reversed argument order + type BinFn = fn(N, N) -> N; + let op_fn: Option> = match op { + ArithmeticOp::AddWrapping => Some(ArrowNativeTypeOp::add_wrapping), + ArithmeticOp::MulWrapping => Some(ArrowNativeTypeOp::mul_wrapping), + ArithmeticOp::SubWrapping => Some(ArrowNativeTypeOp::sub_wrapping), + ArithmeticOp::Rem if !scalar.is_zero() => Some(ArrowNativeTypeOp::mod_wrapping), + _ => None, + }; + + let Some(op_fn) = op_fn else { + return Err(Arc::new(primitive)); + }; + + // Note: op(scalar, v) — scalar is the left operand + match primitive.unary_mut(|v| op_fn(scalar, v)) { + Ok(result) => Ok(Arc::new(result)), + Err(arr) => Err(Arc::new(arr)), + } +} + +/// Attempt in-place mutation on the right PrimitiveArray: result\[i\] = op(left\[i\], right\[i\]). +/// Note: parameter order is (right_owned, left_ref) to match the dispatch_inplace_binary macro. +fn try_inplace_binary_rhs( + right: ArrayRef, + left: &ArrayRef, + op: ArithmeticOp, +) -> Result +where + T::Native: ArrowNativeTypeOp, +{ + let left_primitive = left + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&right))?; + + let right_primitive = right + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&right))? + .clone(); + drop(right); + + let mut builder = match right_primitive.into_builder() { + Ok(b) => b, + Err(arr) => return Err(Arc::new(arr)), + }; + + type BinFn = fn(N, N) -> N; + let op_fn: Option> = match op { + ArithmeticOp::AddWrapping => Some(ArrowNativeTypeOp::add_wrapping), + ArithmeticOp::SubWrapping => Some(ArrowNativeTypeOp::sub_wrapping), + ArithmeticOp::MulWrapping => Some(ArrowNativeTypeOp::mul_wrapping), + _ => None, + }; + + let Some(op_fn) = op_fn else { + return Err(Arc::new(builder.finish())); + }; + + let right_slice = builder.values_slice_mut(); + let left_values = left_primitive.values(); + + // Note: op(left\[i\], right\[i\]) — left is the first operand + right_slice + .iter_mut() + .zip(left_values.iter()) + .for_each(|(r, l)| *r = op_fn(*l, *r)); + + // Merge null buffers from both sides + let result = builder.finish(); + if left_primitive.nulls().is_some() { + let merged = NullBuffer::union(result.nulls(), left_primitive.nulls()); + let result = PrimitiveArray::::new(result.values().clone(), merged); + Ok(Arc::new(result)) + } else { + Ok(Arc::new(result)) + } +} + /// Applies a binary [`Datum`] comparison operator `op` to `lhs` and `rhs` pub fn apply_cmp( op: Operator, diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 02628b405ec6c..3c35f983c2eaf 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -40,7 +40,9 @@ use datafusion_expr::statistics::{ create_bernoulli_from_comparison, new_generic_from_binary_op, }; use datafusion_expr::{ColumnarValue, Operator}; -use datafusion_physical_expr_common::datum::{apply, apply_cmp}; +use datafusion_physical_expr_common::datum::{ + ArithmeticOp, apply, apply_arithmetic, apply_cmp, +}; use kernels::{ bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar, @@ -163,6 +165,46 @@ fn boolean_op( op(ll, rr).map(|t| Arc::new(t) as _) } +/// Boolean AND/OR operation kind for in-place optimization. +#[derive(Copy, Clone)] +enum BoolOp { + And, + Or, +} + +/// Try in-place bitwise AND/OR on boolean arrays when neither side has nulls. +/// Uses `BooleanBuffer`'s `BitAndAssign`/`BitOrAssign` which internally attempts +/// `Buffer::into_mutable()` for in-place mutation, falling back to allocation if shared. +/// Falls back to standard kleene kernel when nulls are present. +fn boolean_op_inplace(left: ArrayRef, right: &ArrayRef, op: BoolOp) -> Result { + // Only optimize the non-null case; kleene logic is needed when nulls are present + if left.null_count() != 0 || right.null_count() != 0 || left.len() != right.len() { + let kleene_fn = match op { + BoolOp::And => and_kleene, + BoolOp::Or => or_kleene, + }; + return Ok(boolean_op(&left, &right, kleene_fn)?); + } + + let left_bool = as_boolean_array(&left) + .expect("boolean_op_inplace failed to downcast left array"); + let right_bool = as_boolean_array(&right) + .expect("boolean_op_inplace failed to downcast right array"); + + let right_values = right_bool.values(); + // Clone left BooleanBuffer (cheap Arc clone), then drop ArrayRef to + // reduce refcount so BitAndAssign/BitOrAssign can mutate in-place. + let mut left_values = left_bool.values().clone(); + drop(left); + + match op { + BoolOp::And => left_values &= right_values, + BoolOp::Or => left_values |= right_values, + } + + Ok(Arc::new(BooleanArray::new(left_values, None))) +} + /// Returns true if both operands are Date types (Date32 or Date64) /// Used to detect Date - Date operations which should return Int64 (days difference) fn is_date_minus_date(lhs: &DataType, rhs: &DataType) -> bool { @@ -271,8 +313,6 @@ impl PhysicalExpr for BinaryExpr { } fn evaluate(&self, batch: &RecordBatch) -> Result { - use arrow::compute::kernels::numeric::*; - // Evaluate left-hand side expression. let lhs = self.left.evaluate(batch)?; @@ -338,19 +378,31 @@ impl PhysicalExpr for BinaryExpr { let input_schema = schema.as_ref(); match self.op { - Operator::Plus if self.fail_on_overflow => return apply(&lhs, &rhs, add), - Operator::Plus => return apply(&lhs, &rhs, add_wrapping), + Operator::Plus if self.fail_on_overflow => { + return apply_arithmetic(lhs, rhs, ArithmeticOp::Add); + } + Operator::Plus => { + return apply_arithmetic(lhs, rhs, ArithmeticOp::AddWrapping); + } // Special case: Date - Date returns Int64 (days difference) // This aligns with PostgreSQL, DuckDB, and MySQL behavior Operator::Minus if is_date_minus_date(&left_data_type, &right_data_type) => { return apply_date_subtraction(&lhs, &rhs); } - Operator::Minus if self.fail_on_overflow => return apply(&lhs, &rhs, sub), - Operator::Minus => return apply(&lhs, &rhs, sub_wrapping), - Operator::Multiply if self.fail_on_overflow => return apply(&lhs, &rhs, mul), - Operator::Multiply => return apply(&lhs, &rhs, mul_wrapping), - Operator::Divide => return apply(&lhs, &rhs, div), - Operator::Modulo => return apply(&lhs, &rhs, rem), + Operator::Minus if self.fail_on_overflow => { + return apply_arithmetic(lhs, rhs, ArithmeticOp::Sub); + } + Operator::Minus => { + return apply_arithmetic(lhs, rhs, ArithmeticOp::SubWrapping); + } + Operator::Multiply if self.fail_on_overflow => { + return apply_arithmetic(lhs, rhs, ArithmeticOp::Mul); + } + Operator::Multiply => { + return apply_arithmetic(lhs, rhs, ArithmeticOp::MulWrapping); + } + Operator::Divide => return apply_arithmetic(lhs, rhs, ArithmeticOp::Div), + Operator::Modulo => return apply_arithmetic(lhs, rhs, ArithmeticOp::Rem), Operator::Eq | Operator::NotEq @@ -681,7 +733,7 @@ impl BinaryExpr { | NotLikeMatch | NotILikeMatch => unreachable!(), And => { if left_data_type == &DataType::Boolean { - Ok(boolean_op(&left, &right, and_kleene)?) + boolean_op_inplace(left, &right, BoolOp::And) } else { internal_err!( "Cannot evaluate binary expression {:?} with types {:?} and {:?}", @@ -693,7 +745,7 @@ impl BinaryExpr { } Or => { if left_data_type == &DataType::Boolean { - Ok(boolean_op(&left, &right, or_kleene)?) + boolean_op_inplace(left, &right, BoolOp::Or) } else { internal_err!( "Cannot evaluate binary expression {:?} with types {:?} and {:?}",