From e73a965ceca466c7dc132fe14774619ae6616ff8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 3 Apr 2026 14:12:56 +0200 Subject: [PATCH 1/4] Add in-place buffer reuse for arithmetic binary expression evaluation When evaluating arithmetic binary expressions (+, -, *, /, %), reuse the left operand's buffer in-place when its reference count is 1, avoiding a buffer allocation. This benefits expression chains like a + b + c where intermediate results are consumed only once. Uses PrimitiveArray::unary_mut for array-scalar and into_builder for array-array cases. Only wrapping (infallible) ops use in-place mutation; checked ops fall back to standard Arrow kernels to avoid corrupting buffers on overflow. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-expr-common/src/datum.rs | 257 +++++++++++++++++- .../physical-expr/src/expressions/binary.rs | 34 ++- 2 files changed, 279 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-expr-common/src/datum.rs b/datafusion/physical-expr-common/src/datum.rs index bd5790507f662..8fa834f0a6bc9 100644 --- a/datafusion/physical-expr-common/src/datum.rs +++ b/datafusion/physical-expr-common/src/datum.rs @@ -15,8 +15,12 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::ArrowNativeTypeOp; use arrow::array::BooleanArray; -use arrow::array::{ArrayRef, Datum, make_comparator}; +use arrow::array::types::*; +use arrow::array::{ + Array, ArrayRef, ArrowPrimitiveType, Datum, PrimitiveArray, make_comparator, +}; use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::compute::kernels::cmp::{ distinct, eq, gt, gt_eq, lt, lt_eq, neq, not_distinct, @@ -55,6 +59,257 @@ pub fn apply( } } +/// Arithmetic operations that can be applied in-place on primitive arrays. +#[derive(Debug, Copy, Clone)] +pub enum ArithmeticOp { + Add, + AddWrapping, + Sub, + SubWrapping, + Mul, + MulWrapping, + Div, + Rem, +} + +/// Like [`apply`], but takes ownership of `ColumnarValue` inputs to enable +/// in-place buffer reuse for arithmetic on primitive arrays. +/// +/// When the left operand is a primitive array whose underlying buffer has a +/// reference count of 1 (i.e. no other consumers), the arithmetic is performed +/// in-place using [`PrimitiveArray::unary_mut`] or [`PrimitiveArray::try_unary_mut`], +/// avoiding a buffer allocation. If in-place mutation is not possible (shared +/// buffer, non-primitive type, etc.) this falls back to the standard Arrow +/// compute kernel. +pub fn apply_arithmetic( + lhs: ColumnarValue, + rhs: ColumnarValue, + op: ArithmeticOp, +) -> Result { + let f = arithmetic_op_to_fn(op); + match (lhs, rhs) { + (ColumnarValue::Array(left), ColumnarValue::Array(right)) => { + // Try in-place on left array with right array values + match try_apply_inplace_array(left, &right, op) { + Ok(result) => Ok(ColumnarValue::Array(result)), + Err(left) => Ok(ColumnarValue::Array(f(&left, &right)?)), + } + } + (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => { + Ok(ColumnarValue::Array(f(&left.to_scalar()?, &right)?)) + } + (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => { + // Try in-place on left array with scalar right + match try_apply_inplace_scalar(left, &right, op) { + Ok(result) => Ok(ColumnarValue::Array(result)), + Err(left) => Ok(ColumnarValue::Array(f(&left, &right.to_scalar()?)?)), + } + } + (ColumnarValue::Scalar(left), ColumnarValue::Scalar(right)) => { + let array = f(&left.to_scalar()?, &right.to_scalar()?)?; + let scalar = ScalarValue::try_from_array(array.as_ref(), 0)?; + Ok(ColumnarValue::Scalar(scalar)) + } + } +} + +fn arithmetic_op_to_fn( + op: ArithmeticOp, +) -> fn(&dyn Datum, &dyn Datum) -> Result { + use arrow::compute::kernels::numeric::*; + match op { + ArithmeticOp::Add => add, + ArithmeticOp::AddWrapping => add_wrapping, + ArithmeticOp::Sub => sub, + ArithmeticOp::SubWrapping => sub_wrapping, + ArithmeticOp::Mul => mul, + ArithmeticOp::MulWrapping => mul_wrapping, + ArithmeticOp::Div => div, + ArithmeticOp::Rem => rem, + } +} + +/// Try to apply arithmetic in-place on `left` array with a scalar `right`. +/// Returns `Ok(result)` on success, or `Err(left)` if in-place not possible. +fn try_apply_inplace_scalar( + left: ArrayRef, + right: &ScalarValue, + op: ArithmeticOp, +) -> Result { + if right.is_null() { + return Err(left); + } + macro_rules! dispatch_inplace_scalar { + ($($arrow_type:ident),*) => { + match left.data_type() { + $( + dt if dt == &<$arrow_type as ArrowPrimitiveType>::DATA_TYPE => { + let scalar_val = right + .to_scalar() + .map_err(|_| Arc::clone(&left))?; + let scalar_arr = scalar_val.get().0; + let rhs_val = scalar_arr + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&left))? + .value(0); + try_inplace_unary::<$arrow_type>(left, rhs_val, op) + } + )* + _ => Err(left), + } + }; + } + dispatch_inplace_scalar!( + Int8Type, + Int16Type, + Int32Type, + Int64Type, + UInt8Type, + UInt16Type, + UInt32Type, + UInt64Type, + Float16Type, + Float32Type, + Float64Type + ) +} + +/// Try to apply arithmetic in-place on `left` array using values from `right` array. +/// Returns `Ok(result)` on success, or `Err(left)` if in-place not possible. +fn try_apply_inplace_array( + left: ArrayRef, + right: &ArrayRef, + op: ArithmeticOp, +) -> Result { + if left.data_type() != right.data_type() { + return Err(left); + } + macro_rules! dispatch_inplace_array { + ($($arrow_type:ident),*) => { + match left.data_type() { + $( + dt if dt == &<$arrow_type as ArrowPrimitiveType>::DATA_TYPE => { + try_inplace_binary::<$arrow_type>(left, right, op) + } + )* + _ => Err(left), + } + }; + } + dispatch_inplace_array!( + Int8Type, + Int16Type, + Int32Type, + Int64Type, + UInt8Type, + UInt16Type, + UInt32Type, + UInt64Type, + Float16Type, + Float32Type, + Float64Type + ) +} + +/// Attempt in-place unary (array op scalar) mutation on a PrimitiveArray. +fn try_inplace_unary( + array: ArrayRef, + scalar: T::Native, + op: ArithmeticOp, +) -> Result +where + T::Native: ArrowNativeTypeOp, +{ + // Clone the PrimitiveArray (cheap — shares the buffer via Arc) + let primitive = array + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&array))? + .clone(); + // Drop the ArrayRef so the buffer's refcount can drop to 1 + drop(array); + + // Only attempt in-place for wrapping (infallible) operations. + // Checked ops (Add, Sub, Mul, Div) can fail mid-way, corrupting the buffer. + // Rem with zero divisor must also fall back for proper error reporting. + type BinFn = fn(N, N) -> N; + let op_fn: Option> = match op { + ArithmeticOp::AddWrapping => Some(ArrowNativeTypeOp::add_wrapping), + ArithmeticOp::SubWrapping => Some(ArrowNativeTypeOp::sub_wrapping), + ArithmeticOp::MulWrapping => Some(ArrowNativeTypeOp::mul_wrapping), + ArithmeticOp::Rem if !scalar.is_zero() => Some(ArrowNativeTypeOp::mod_wrapping), + _ => None, + }; + + let Some(op_fn) = op_fn else { + return Err(Arc::new(primitive)); + }; + + match primitive.unary_mut(|v| op_fn(v, scalar)) { + Ok(result) => Ok(Arc::new(result)), + Err(arr) => Err(Arc::new(arr)), + } +} + +/// Attempt in-place binary (array op array) mutation on a PrimitiveArray. +fn try_inplace_binary( + left: ArrayRef, + right: &ArrayRef, + op: ArithmeticOp, +) -> Result +where + T::Native: ArrowNativeTypeOp, +{ + let right_primitive = right + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&left))?; + + let left_primitive = left + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&left))? + .clone(); + drop(left); + + let mut builder = match left_primitive.into_builder() { + Ok(b) => b, + Err(arr) => return Err(Arc::new(arr)), + }; + + // Only attempt in-place for wrapping (infallible) operations. + type BinFn = fn(N, N) -> N; + let op_fn: Option> = match op { + ArithmeticOp::AddWrapping => Some(ArrowNativeTypeOp::add_wrapping), + ArithmeticOp::SubWrapping => Some(ArrowNativeTypeOp::sub_wrapping), + ArithmeticOp::MulWrapping => Some(ArrowNativeTypeOp::mul_wrapping), + _ => None, + }; + + let Some(op_fn) = op_fn else { + return Err(Arc::new(builder.finish())); + }; + + let left_slice = builder.values_slice_mut(); + let right_values = right_primitive.values(); + + left_slice + .iter_mut() + .zip(right_values.iter()) + .for_each(|(l, r)| *l = op_fn(*l, *r)); + + // Merge null buffers from both sides + let result = builder.finish(); + if right_primitive.nulls().is_some() { + let merged = NullBuffer::union(result.nulls(), right_primitive.nulls()); + let result = PrimitiveArray::::new(result.values().clone(), merged); + Ok(Arc::new(result)) + } else { + Ok(Arc::new(result)) + } +} + /// Applies a binary [`Datum`] comparison operator `op` to `lhs` and `rhs` pub fn apply_cmp( op: Operator, diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 02628b405ec6c..6c56f49957a9a 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -40,7 +40,9 @@ use datafusion_expr::statistics::{ create_bernoulli_from_comparison, new_generic_from_binary_op, }; use datafusion_expr::{ColumnarValue, Operator}; -use datafusion_physical_expr_common::datum::{apply, apply_cmp}; +use datafusion_physical_expr_common::datum::{ + ArithmeticOp, apply, apply_arithmetic, apply_cmp, +}; use kernels::{ bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar, @@ -271,8 +273,6 @@ impl PhysicalExpr for BinaryExpr { } fn evaluate(&self, batch: &RecordBatch) -> Result { - use arrow::compute::kernels::numeric::*; - // Evaluate left-hand side expression. let lhs = self.left.evaluate(batch)?; @@ -338,19 +338,31 @@ impl PhysicalExpr for BinaryExpr { let input_schema = schema.as_ref(); match self.op { - Operator::Plus if self.fail_on_overflow => return apply(&lhs, &rhs, add), - Operator::Plus => return apply(&lhs, &rhs, add_wrapping), + Operator::Plus if self.fail_on_overflow => { + return apply_arithmetic(lhs, rhs, ArithmeticOp::Add); + } + Operator::Plus => { + return apply_arithmetic(lhs, rhs, ArithmeticOp::AddWrapping); + } // Special case: Date - Date returns Int64 (days difference) // This aligns with PostgreSQL, DuckDB, and MySQL behavior Operator::Minus if is_date_minus_date(&left_data_type, &right_data_type) => { return apply_date_subtraction(&lhs, &rhs); } - Operator::Minus if self.fail_on_overflow => return apply(&lhs, &rhs, sub), - Operator::Minus => return apply(&lhs, &rhs, sub_wrapping), - Operator::Multiply if self.fail_on_overflow => return apply(&lhs, &rhs, mul), - Operator::Multiply => return apply(&lhs, &rhs, mul_wrapping), - Operator::Divide => return apply(&lhs, &rhs, div), - Operator::Modulo => return apply(&lhs, &rhs, rem), + Operator::Minus if self.fail_on_overflow => { + return apply_arithmetic(lhs, rhs, ArithmeticOp::Sub); + } + Operator::Minus => { + return apply_arithmetic(lhs, rhs, ArithmeticOp::SubWrapping); + } + Operator::Multiply if self.fail_on_overflow => { + return apply_arithmetic(lhs, rhs, ArithmeticOp::Mul); + } + Operator::Multiply => { + return apply_arithmetic(lhs, rhs, ArithmeticOp::MulWrapping); + } + Operator::Divide => return apply_arithmetic(lhs, rhs, ArithmeticOp::Div), + Operator::Modulo => return apply_arithmetic(lhs, rhs, ArithmeticOp::Rem), Operator::Eq | Operator::NotEq From 4c6f444850955788454f4387f25f7923c334e0da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 3 Apr 2026 14:17:25 +0200 Subject: [PATCH 2/4] Also try in-place mutation on the right operand When the left operand's buffer cannot be reused (shared reference), try the right operand for in-place mutation. This covers cases like Scalar-Array and Array-Array where the right side has refcount 1. For non-commutative ops (sub, rem), the argument order is preserved correctly: result[i] = op(left[i], right[i]). Also refactors type dispatch into shared macros. Decimal types are excluded from in-place mutation because the result precision/scale differs from the input. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-expr-common/src/datum.rs | 304 +++++++++++++++---- 1 file changed, 242 insertions(+), 62 deletions(-) diff --git a/datafusion/physical-expr-common/src/datum.rs b/datafusion/physical-expr-common/src/datum.rs index 8fa834f0a6bc9..64de90f24f416 100644 --- a/datafusion/physical-expr-common/src/datum.rs +++ b/datafusion/physical-expr-common/src/datum.rs @@ -89,14 +89,21 @@ pub fn apply_arithmetic( let f = arithmetic_op_to_fn(op); match (lhs, rhs) { (ColumnarValue::Array(left), ColumnarValue::Array(right)) => { - // Try in-place on left array with right array values + // Try in-place on left array, then right array match try_apply_inplace_array(left, &right, op) { Ok(result) => Ok(ColumnarValue::Array(result)), - Err(left) => Ok(ColumnarValue::Array(f(&left, &right)?)), + Err(left) => match try_apply_inplace_array_rhs(&left, right, op) { + Ok(result) => Ok(ColumnarValue::Array(result)), + Err(right) => Ok(ColumnarValue::Array(f(&left, &right)?)), + }, } } (ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => { - Ok(ColumnarValue::Array(f(&left.to_scalar()?, &right)?)) + // Try in-place on right array with scalar left (flipped) + match try_apply_inplace_scalar_rhs(right, &left, op) { + Ok(result) => Ok(ColumnarValue::Array(result)), + Err(right) => Ok(ColumnarValue::Array(f(&left.to_scalar()?, &right)?)), + } } (ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => { // Try in-place on left array with scalar right @@ -129,6 +136,110 @@ fn arithmetic_op_to_fn( } } +/// Dispatches an in-place unary (array op scalar) operation across all supported primitive types. +/// `$arr` is the ArrayRef, `$scalar_value` is the &ScalarValue, `$op` is ArithmeticOp, +/// `$fn_name` is the function to call (try_inplace_unary or try_inplace_unary_rhs). +macro_rules! dispatch_inplace_unary { + ($arr:expr, $scalar_value:expr, $op:expr, $fn_name:ident) => {{ + macro_rules! do_dispatch { + ($arrow_type:ty, $arr_inner:expr) => {{ + let scalar_val = $scalar_value + .to_scalar() + .map_err(|_| Arc::clone(&$arr_inner))?; + let scalar_arr = scalar_val.get().0; + let rhs_val = scalar_arr + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&$arr_inner))? + .value(0); + $fn_name::<$arrow_type>($arr_inner, rhs_val, $op) + }}; + } + match $arr.data_type() { + dt if dt == &::DATA_TYPE => { + do_dispatch!(Int8Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(Int16Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(Int32Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(Int64Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(UInt8Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(UInt16Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(UInt32Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(UInt64Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(Float16Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(Float32Type, $arr) + } + dt if dt == &::DATA_TYPE => { + do_dispatch!(Float64Type, $arr) + } + // Decimal types excluded: result precision/scale differs from input + _ => Err($arr), + } + }}; +} + +/// Dispatches an in-place binary (array op array) operation across all supported primitive types. +/// `$arr` is the ArrayRef to mutate, `$other` is the other ArrayRef, `$op` is ArithmeticOp, +/// `$fn_name` is the function to call (try_inplace_binary or try_inplace_binary_rhs). +macro_rules! dispatch_inplace_binary { + ($arr:expr, $other:expr, $op:expr, $fn_name:ident) => {{ + match $arr.data_type() { + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + dt if dt == &::DATA_TYPE => { + $fn_name::($arr, $other, $op) + } + // Decimal types excluded: result precision/scale differs from input + _ => Err($arr), + } + }}; +} + /// Try to apply arithmetic in-place on `left` array with a scalar `right`. /// Returns `Ok(result)` on success, or `Err(left)` if in-place not possible. fn try_apply_inplace_scalar( @@ -139,40 +250,7 @@ fn try_apply_inplace_scalar( if right.is_null() { return Err(left); } - macro_rules! dispatch_inplace_scalar { - ($($arrow_type:ident),*) => { - match left.data_type() { - $( - dt if dt == &<$arrow_type as ArrowPrimitiveType>::DATA_TYPE => { - let scalar_val = right - .to_scalar() - .map_err(|_| Arc::clone(&left))?; - let scalar_arr = scalar_val.get().0; - let rhs_val = scalar_arr - .as_any() - .downcast_ref::>() - .ok_or_else(|| Arc::clone(&left))? - .value(0); - try_inplace_unary::<$arrow_type>(left, rhs_val, op) - } - )* - _ => Err(left), - } - }; - } - dispatch_inplace_scalar!( - Int8Type, - Int16Type, - Int32Type, - Int64Type, - UInt8Type, - UInt16Type, - UInt32Type, - UInt64Type, - Float16Type, - Float32Type, - Float64Type - ) + dispatch_inplace_unary!(left, right, op, try_inplace_unary) } /// Try to apply arithmetic in-place on `left` array using values from `right` array. @@ -185,31 +263,7 @@ fn try_apply_inplace_array( if left.data_type() != right.data_type() { return Err(left); } - macro_rules! dispatch_inplace_array { - ($($arrow_type:ident),*) => { - match left.data_type() { - $( - dt if dt == &<$arrow_type as ArrowPrimitiveType>::DATA_TYPE => { - try_inplace_binary::<$arrow_type>(left, right, op) - } - )* - _ => Err(left), - } - }; - } - dispatch_inplace_array!( - Int8Type, - Int16Type, - Int32Type, - Int64Type, - UInt8Type, - UInt16Type, - UInt32Type, - UInt64Type, - Float16Type, - Float32Type, - Float64Type - ) + dispatch_inplace_binary!(left, right, op, try_inplace_binary) } /// Attempt in-place unary (array op scalar) mutation on a PrimitiveArray. @@ -310,6 +364,132 @@ where } } +/// Try to apply arithmetic in-place on `right` array with a scalar `left`. +/// The operation is `result[i] = op(scalar_left, right[i])`, stored in `right`'s buffer. +/// Returns `Ok(result)` on success, or `Err(right)` if in-place not possible. +fn try_apply_inplace_scalar_rhs( + right: ArrayRef, + left: &ScalarValue, + op: ArithmeticOp, +) -> Result { + if left.is_null() { + return Err(right); + } + dispatch_inplace_unary!(right, left, op, try_inplace_unary_rhs) +} + +/// Try to apply arithmetic in-place on `right` array using values from `left` array. +/// The operation is `result[i] = op(left[i], right[i])`, stored in `right`'s buffer. +/// Returns `Ok(result)` on success, or `Err(right)` if in-place not possible. +fn try_apply_inplace_array_rhs( + left: &ArrayRef, + right: ArrayRef, + op: ArithmeticOp, +) -> Result { + if left.data_type() != right.data_type() { + return Err(right); + } + dispatch_inplace_binary!(right, left, op, try_inplace_binary_rhs) +} + +/// Attempt in-place mutation on the right PrimitiveArray: result[i] = op(scalar, right[i]). +fn try_inplace_unary_rhs( + array: ArrayRef, + scalar: T::Native, + op: ArithmeticOp, +) -> Result +where + T::Native: ArrowNativeTypeOp, +{ + let primitive = array + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&array))? + .clone(); + drop(array); + + // For right-side mutation: result = op(scalar, element) + // Commutative ops: same as op(element, scalar) + // Non-commutative: need reversed argument order + type BinFn = fn(N, N) -> N; + let op_fn: Option> = match op { + ArithmeticOp::AddWrapping => Some(ArrowNativeTypeOp::add_wrapping), + ArithmeticOp::MulWrapping => Some(ArrowNativeTypeOp::mul_wrapping), + ArithmeticOp::SubWrapping => Some(ArrowNativeTypeOp::sub_wrapping), + ArithmeticOp::Rem if !scalar.is_zero() => Some(ArrowNativeTypeOp::mod_wrapping), + _ => None, + }; + + let Some(op_fn) = op_fn else { + return Err(Arc::new(primitive)); + }; + + // Note: op(scalar, v) — scalar is the left operand + match primitive.unary_mut(|v| op_fn(scalar, v)) { + Ok(result) => Ok(Arc::new(result)), + Err(arr) => Err(Arc::new(arr)), + } +} + +/// Attempt in-place mutation on the right PrimitiveArray: result[i] = op(left[i], right[i]). +/// Note: parameter order is (right_owned, left_ref) to match the dispatch_inplace_binary macro. +fn try_inplace_binary_rhs( + right: ArrayRef, + left: &ArrayRef, + op: ArithmeticOp, +) -> Result +where + T::Native: ArrowNativeTypeOp, +{ + let left_primitive = left + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&right))?; + + let right_primitive = right + .as_any() + .downcast_ref::>() + .ok_or_else(|| Arc::clone(&right))? + .clone(); + drop(right); + + let mut builder = match right_primitive.into_builder() { + Ok(b) => b, + Err(arr) => return Err(Arc::new(arr)), + }; + + type BinFn = fn(N, N) -> N; + let op_fn: Option> = match op { + ArithmeticOp::AddWrapping => Some(ArrowNativeTypeOp::add_wrapping), + ArithmeticOp::SubWrapping => Some(ArrowNativeTypeOp::sub_wrapping), + ArithmeticOp::MulWrapping => Some(ArrowNativeTypeOp::mul_wrapping), + _ => None, + }; + + let Some(op_fn) = op_fn else { + return Err(Arc::new(builder.finish())); + }; + + let right_slice = builder.values_slice_mut(); + let left_values = left_primitive.values(); + + // Note: op(left[i], right[i]) — left is the first operand + right_slice + .iter_mut() + .zip(left_values.iter()) + .for_each(|(r, l)| *r = op_fn(*l, *r)); + + // Merge null buffers from both sides + let result = builder.finish(); + if left_primitive.nulls().is_some() { + let merged = NullBuffer::union(result.nulls(), left_primitive.nulls()); + let result = PrimitiveArray::::new(result.values().clone(), merged); + Ok(Arc::new(result)) + } else { + Ok(Arc::new(result)) + } +} + /// Applies a binary [`Datum`] comparison operator `op` to `lhs` and `rhs` pub fn apply_cmp( op: Operator, From 0b8616ec0d2c0ba537a9f7cb6ea7f5edced3e761 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 3 Apr 2026 14:43:12 +0200 Subject: [PATCH 3/4] Add in-place bitwise AND/OR for boolean arrays When evaluating boolean AND/OR expressions, try to reuse the left buffer in-place via Buffer::into_mutable. If the left buffer is shared, try the right buffer (AND/OR are commutative). Falls back to standard and_kleene/or_kleene when both buffers are shared or when nulls are present. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../physical-expr/src/expressions/binary.rs | 104 +++++++++++++++++- 1 file changed, 102 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 6c56f49957a9a..de00cad8f67fd 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -23,6 +23,7 @@ use std::hash::Hash; use std::{any::Any, sync::Arc}; use arrow::array::*; +use arrow::buffer::BooleanBuffer; use arrow::compute::kernels::boolean::{and_kleene, or_kleene}; use arrow::compute::kernels::concat_elements::concat_elements_utf8; use arrow::compute::{SlicesIterator, cast, filter_record_batch}; @@ -165,6 +166,105 @@ fn boolean_op( op(ll, rr).map(|t| Arc::new(t) as _) } +/// Boolean AND/OR operation kind for in-place optimization. +#[derive(Copy, Clone)] +enum BoolOp { + And, + Or, +} + +/// Try in-place bitwise AND/OR on boolean arrays when neither side has nulls +/// and both have zero offset. Tries left first, then right. +/// Falls back to the standard kleene kernel otherwise. +fn boolean_op_inplace(left: ArrayRef, right: ArrayRef, op: BoolOp) -> Result { + // Only optimize the non-null, zero-offset case + if left.null_count() != 0 || right.null_count() != 0 || left.len() != right.len() { + let kleene_fn = match op { + BoolOp::And => and_kleene, + BoolOp::Or => or_kleene, + }; + return Ok(boolean_op(&left, &right, kleene_fn)?); + } + + let left_bool = as_boolean_array(&left) + .expect("boolean_op_inplace failed to downcast left array"); + let right_bool = as_boolean_array(&right) + .expect("boolean_op_inplace failed to downcast right array"); + + if left_bool.offset() != 0 || right_bool.offset() != 0 { + let kleene_fn = match op { + BoolOp::And => and_kleene, + BoolOp::Or => or_kleene, + }; + return Ok(boolean_op(&left, &right, kleene_fn)?); + } + + let len = left_bool.len(); + let byte_len = len.div_ceil(8); + + // Try left first + let other_bytes = right_bool.values().inner().as_slice(); + let left_clone = left_bool.clone(); + drop(left); + let (left_values, _nulls) = left_clone.into_parts(); + match left_values.into_inner().into_mutable() { + Ok(mut mutable) => { + apply_bool_assign(mutable.as_slice_mut(), other_bytes, byte_len, op); + Ok(Arc::new(BooleanArray::new( + BooleanBuffer::new(mutable.into(), 0, len), + None, + ))) + } + Err(left_buf) => { + // Left buffer shared — try right + let left_bytes = left_buf.as_slice(); + let right_clone = right_bool.clone(); + drop(right); + let (right_values, _nulls) = right_clone.into_parts(); + match right_values.into_inner().into_mutable() { + Ok(mut mutable) => { + // AND/OR are commutative, so we can swap operands + apply_bool_assign(mutable.as_slice_mut(), left_bytes, byte_len, op); + Ok(Arc::new(BooleanArray::new( + BooleanBuffer::new(mutable.into(), 0, len), + None, + ))) + } + Err(right_buf) => { + // Both shared — fall back + let left_arr = + BooleanArray::new(BooleanBuffer::new(left_buf, 0, len), None); + let right_arr = + BooleanArray::new(BooleanBuffer::new(right_buf, 0, len), None); + let kleene_fn = match op { + BoolOp::And => and_kleene, + BoolOp::Or => or_kleene, + }; + Ok(boolean_op(&left_arr, &right_arr, kleene_fn)?) + } + } + } + } +} + +#[inline] +fn apply_bool_assign(dst: &mut [u8], src: &[u8], byte_len: usize, op: BoolOp) { + match op { + BoolOp::And => { + dst[..byte_len] + .iter_mut() + .zip(&src[..byte_len]) + .for_each(|(d, s)| *d &= s); + } + BoolOp::Or => { + dst[..byte_len] + .iter_mut() + .zip(&src[..byte_len]) + .for_each(|(d, s)| *d |= s); + } + } +} + /// Returns true if both operands are Date types (Date32 or Date64) /// Used to detect Date - Date operations which should return Int64 (days difference) fn is_date_minus_date(lhs: &DataType, rhs: &DataType) -> bool { @@ -693,7 +793,7 @@ impl BinaryExpr { | NotLikeMatch | NotILikeMatch => unreachable!(), And => { if left_data_type == &DataType::Boolean { - Ok(boolean_op(&left, &right, and_kleene)?) + boolean_op_inplace(left, right, BoolOp::And) } else { internal_err!( "Cannot evaluate binary expression {:?} with types {:?} and {:?}", @@ -705,7 +805,7 @@ impl BinaryExpr { } Or => { if left_data_type == &DataType::Boolean { - Ok(boolean_op(&left, &right, or_kleene)?) + boolean_op_inplace(left, right, BoolOp::Or) } else { internal_err!( "Cannot evaluate binary expression {:?} with types {:?} and {:?}", From f70c9e7cc8b0787cbbc1ce12975e77fbaaeacd2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 3 Apr 2026 14:43:44 +0200 Subject: [PATCH 4/4] Add in-place bitwise AND/OR for boolean arrays When evaluating boolean AND/OR expressions without nulls, use BooleanBuffer's BitAndAssign/BitOrAssign which internally attempts Buffer::into_mutable() for in-place mutation. Falls back to and_kleene/or_kleene when nulls are present. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-expr-common/src/datum.rs | 10 +- .../physical-expr/src/expressions/binary.rs | 92 ++++--------------- 2 files changed, 21 insertions(+), 81 deletions(-) diff --git a/datafusion/physical-expr-common/src/datum.rs b/datafusion/physical-expr-common/src/datum.rs index 64de90f24f416..6098e938091ea 100644 --- a/datafusion/physical-expr-common/src/datum.rs +++ b/datafusion/physical-expr-common/src/datum.rs @@ -365,7 +365,7 @@ where } /// Try to apply arithmetic in-place on `right` array with a scalar `left`. -/// The operation is `result[i] = op(scalar_left, right[i])`, stored in `right`'s buffer. +/// The operation is `result\[i\] = op(scalar_left, right\[i\])`, stored in `right`'s buffer. /// Returns `Ok(result)` on success, or `Err(right)` if in-place not possible. fn try_apply_inplace_scalar_rhs( right: ArrayRef, @@ -379,7 +379,7 @@ fn try_apply_inplace_scalar_rhs( } /// Try to apply arithmetic in-place on `right` array using values from `left` array. -/// The operation is `result[i] = op(left[i], right[i])`, stored in `right`'s buffer. +/// The operation is `result\[i\] = op(left\[i\], right\[i\])`, stored in `right`'s buffer. /// Returns `Ok(result)` on success, or `Err(right)` if in-place not possible. fn try_apply_inplace_array_rhs( left: &ArrayRef, @@ -392,7 +392,7 @@ fn try_apply_inplace_array_rhs( dispatch_inplace_binary!(right, left, op, try_inplace_binary_rhs) } -/// Attempt in-place mutation on the right PrimitiveArray: result[i] = op(scalar, right[i]). +/// Attempt in-place mutation on the right PrimitiveArray: result\[i\] = op(scalar, right\[i\]). fn try_inplace_unary_rhs( array: ArrayRef, scalar: T::Native, @@ -431,7 +431,7 @@ where } } -/// Attempt in-place mutation on the right PrimitiveArray: result[i] = op(left[i], right[i]). +/// Attempt in-place mutation on the right PrimitiveArray: result\[i\] = op(left\[i\], right\[i\]). /// Note: parameter order is (right_owned, left_ref) to match the dispatch_inplace_binary macro. fn try_inplace_binary_rhs( right: ArrayRef, @@ -473,7 +473,7 @@ where let right_slice = builder.values_slice_mut(); let left_values = left_primitive.values(); - // Note: op(left[i], right[i]) — left is the first operand + // Note: op(left\[i\], right\[i\]) — left is the first operand right_slice .iter_mut() .zip(left_values.iter()) diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index de00cad8f67fd..3c35f983c2eaf 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -23,7 +23,6 @@ use std::hash::Hash; use std::{any::Any, sync::Arc}; use arrow::array::*; -use arrow::buffer::BooleanBuffer; use arrow::compute::kernels::boolean::{and_kleene, or_kleene}; use arrow::compute::kernels::concat_elements::concat_elements_utf8; use arrow::compute::{SlicesIterator, cast, filter_record_batch}; @@ -173,11 +172,12 @@ enum BoolOp { Or, } -/// Try in-place bitwise AND/OR on boolean arrays when neither side has nulls -/// and both have zero offset. Tries left first, then right. -/// Falls back to the standard kleene kernel otherwise. -fn boolean_op_inplace(left: ArrayRef, right: ArrayRef, op: BoolOp) -> Result { - // Only optimize the non-null, zero-offset case +/// Try in-place bitwise AND/OR on boolean arrays when neither side has nulls. +/// Uses `BooleanBuffer`'s `BitAndAssign`/`BitOrAssign` which internally attempts +/// `Buffer::into_mutable()` for in-place mutation, falling back to allocation if shared. +/// Falls back to standard kleene kernel when nulls are present. +fn boolean_op_inplace(left: ArrayRef, right: &ArrayRef, op: BoolOp) -> Result { + // Only optimize the non-null case; kleene logic is needed when nulls are present if left.null_count() != 0 || right.null_count() != 0 || left.len() != right.len() { let kleene_fn = match op { BoolOp::And => and_kleene, @@ -191,78 +191,18 @@ fn boolean_op_inplace(left: ArrayRef, right: ArrayRef, op: BoolOp) -> Result and_kleene, - BoolOp::Or => or_kleene, - }; - return Ok(boolean_op(&left, &right, kleene_fn)?); - } - - let len = left_bool.len(); - let byte_len = len.div_ceil(8); - - // Try left first - let other_bytes = right_bool.values().inner().as_slice(); - let left_clone = left_bool.clone(); + let right_values = right_bool.values(); + // Clone left BooleanBuffer (cheap Arc clone), then drop ArrayRef to + // reduce refcount so BitAndAssign/BitOrAssign can mutate in-place. + let mut left_values = left_bool.values().clone(); drop(left); - let (left_values, _nulls) = left_clone.into_parts(); - match left_values.into_inner().into_mutable() { - Ok(mut mutable) => { - apply_bool_assign(mutable.as_slice_mut(), other_bytes, byte_len, op); - Ok(Arc::new(BooleanArray::new( - BooleanBuffer::new(mutable.into(), 0, len), - None, - ))) - } - Err(left_buf) => { - // Left buffer shared — try right - let left_bytes = left_buf.as_slice(); - let right_clone = right_bool.clone(); - drop(right); - let (right_values, _nulls) = right_clone.into_parts(); - match right_values.into_inner().into_mutable() { - Ok(mut mutable) => { - // AND/OR are commutative, so we can swap operands - apply_bool_assign(mutable.as_slice_mut(), left_bytes, byte_len, op); - Ok(Arc::new(BooleanArray::new( - BooleanBuffer::new(mutable.into(), 0, len), - None, - ))) - } - Err(right_buf) => { - // Both shared — fall back - let left_arr = - BooleanArray::new(BooleanBuffer::new(left_buf, 0, len), None); - let right_arr = - BooleanArray::new(BooleanBuffer::new(right_buf, 0, len), None); - let kleene_fn = match op { - BoolOp::And => and_kleene, - BoolOp::Or => or_kleene, - }; - Ok(boolean_op(&left_arr, &right_arr, kleene_fn)?) - } - } - } - } -} -#[inline] -fn apply_bool_assign(dst: &mut [u8], src: &[u8], byte_len: usize, op: BoolOp) { match op { - BoolOp::And => { - dst[..byte_len] - .iter_mut() - .zip(&src[..byte_len]) - .for_each(|(d, s)| *d &= s); - } - BoolOp::Or => { - dst[..byte_len] - .iter_mut() - .zip(&src[..byte_len]) - .for_each(|(d, s)| *d |= s); - } + BoolOp::And => left_values &= right_values, + BoolOp::Or => left_values |= right_values, } + + Ok(Arc::new(BooleanArray::new(left_values, None))) } /// Returns true if both operands are Date types (Date32 or Date64) @@ -793,7 +733,7 @@ impl BinaryExpr { | NotLikeMatch | NotILikeMatch => unreachable!(), And => { if left_data_type == &DataType::Boolean { - boolean_op_inplace(left, right, BoolOp::And) + boolean_op_inplace(left, &right, BoolOp::And) } else { internal_err!( "Cannot evaluate binary expression {:?} with types {:?} and {:?}", @@ -805,7 +745,7 @@ impl BinaryExpr { } Or => { if left_data_type == &DataType::Boolean { - boolean_op_inplace(left, right, BoolOp::Or) + boolean_op_inplace(left, &right, BoolOp::Or) } else { internal_err!( "Cannot evaluate binary expression {:?} with types {:?} and {:?}",