diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs index 9b91062657a07..ae8d07be7e94d 100644 --- a/datafusion/catalog/src/memory/table.rs +++ b/datafusion/catalog/src/memory/table.rs @@ -27,8 +27,9 @@ use crate::TableProvider; use arrow::array::{ Array, ArrayRef, BooleanArray, RecordBatch as ArrowRecordBatch, UInt64Array, }; +use arrow::buffer::BooleanBuffer; +use arrow::compute::filter_record_batch; use arrow::compute::kernels::zip::zip; -use arrow::compute::{and, filter_record_batch}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::error::Result; @@ -515,7 +516,7 @@ fn evaluate_filters_to_mask( return Ok(None); } - let mut combined_mask: Option = None; + let mut combined_buf: Option = None; for filter_expr in filters { let physical_expr = @@ -523,23 +524,29 @@ fn evaluate_filters_to_mask( let result = physical_expr.evaluate(batch)?; let array = result.into_array(batch.num_rows())?; - let bool_array = array - .as_any() - .downcast_ref::() - .ok_or_else(|| { - datafusion_common::DataFusionError::Internal( - "Filter did not evaluate to boolean".to_string(), - ) - })? - .clone(); - - combined_mask = Some(match combined_mask { - Some(existing) => and(&existing, &bool_array)?, - None => bool_array, - }); + let bool_array = + array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion_common::DataFusionError::Internal( + "Filter did not evaluate to boolean".to_string(), + ) + })?; + + // Convert to BooleanBuffer, treating null as false + let buf = match bool_array.nulls() { + Some(nulls) => bool_array.values() & nulls.inner(), + None => bool_array.values().clone(), + }; + + match &mut combined_buf { + Some(existing) => *existing &= &buf, + None => combined_buf = Some(buf), + } } - Ok(combined_mask) + Ok(combined_buf.map(|buf| BooleanArray::new(buf, None))) } /// Returns a single row with the count of affected rows. diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 77e29cf35cd5b..0bd3f367159bd 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -22,7 +22,6 @@ use crate::{ ObjectStoreFetch, apply_file_schema_type_coercions, coerce_int96_to_resolution, }; use arrow::array::{Array, ArrayRef, BooleanArray}; -use arrow::compute::and; use arrow::compute::kernels::cmp::eq; use arrow::compute::sum; use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit}; @@ -649,8 +648,16 @@ fn has_any_exact_match( let scalar_array = value.to_scalar().ok()?; let eq_mask = eq(&scalar_array, &array).ok()?; - let combined_mask = and(&eq_mask, exactness).ok()?; - Some(combined_mask.true_count() > 0) + // Combine the two masks using BooleanBuffer bitwise AND, treating null as false + let eq_buf = match eq_mask.nulls() { + Some(nulls) => eq_mask.values() & nulls.inner(), + None => eq_mask.values().clone(), + }; + let exact_buf = match exactness.nulls() { + Some(nulls) => exactness.values() & nulls.inner(), + None => exactness.values().clone(), + }; + Some(BooleanArray::new(&eq_buf & &exact_buf, None).has_true()) } /// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`]. diff --git a/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs b/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs index 81a4787dd522f..3a819fef49dcd 100644 --- a/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs +++ b/datafusion/functions-aggregate/src/approx_percentile_cont_with_weight.rs @@ -20,7 +20,8 @@ use std::hash::Hash; use std::mem::size_of_val; use std::sync::Arc; -use arrow::compute::{and, filter, is_not_null}; +use arrow::array::BooleanArray; +use arrow::compute::{filter, is_not_null}; use arrow::datatypes::FieldRef; use arrow::{array::ArrayRef, datatypes::DataType}; use datafusion_common::ScalarValue; @@ -280,7 +281,10 @@ impl Accumulator for ApproxPercentileWithWeightAccumulator { match (means.null_count() > 0, weights.null_count() > 0) { // Both have nulls (true, true) => { - let predicate = and(&is_not_null(&means)?, &is_not_null(&weights)?)?; + let predicate = BooleanArray::new( + is_not_null(&means)?.values() & is_not_null(&weights)?.values(), + None, + ); means = filter(&means, &predicate)?; weights = filter(&weights, &predicate)?; } diff --git a/datafusion/functions-aggregate/src/correlation.rs b/datafusion/functions-aggregate/src/correlation.rs index 2621fcf0bf3c7..73727820667cc 100644 --- a/datafusion/functions-aggregate/src/correlation.rs +++ b/datafusion/functions-aggregate/src/correlation.rs @@ -25,7 +25,7 @@ use arrow::array::{ Array, AsArray, BooleanArray, Float64Array, NullBufferBuilder, UInt64Array, downcast_array, }; -use arrow::compute::{and, filter, is_not_null}; +use arrow::compute::{filter, is_not_null}; use arrow::datatypes::{FieldRef, Float64Type, UInt64Type}; use arrow::{ array::ArrayRef, @@ -174,7 +174,10 @@ impl Accumulator for CorrelationAccumulator { // calculation logic in children accumulators, and calling only // calculation part from Correlation let values = if values[0].null_count() != 0 || values[1].null_count() != 0 { - let mask = and(&is_not_null(&values[0])?, &is_not_null(&values[1])?)?; + let mask = BooleanArray::new( + is_not_null(&values[0])?.values() & is_not_null(&values[1])?.values(), + None, + ); let values1 = filter(&values[0], &mask)?; let values2 = filter(&values[1], &mask)?; @@ -267,7 +270,10 @@ impl Accumulator for CorrelationAccumulator { fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let values = if values[0].null_count() != 0 || values[1].null_count() != 0 { - let mask = and(&is_not_null(&values[0])?, &is_not_null(&values[1])?)?; + let mask = BooleanArray::new( + is_not_null(&values[0])?.values() & is_not_null(&values[1])?.values(), + None, + ); let values1 = filter(&values[0], &mask)?; let values2 = filter(&values[1], &mask)?; diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index ffe8cf4a9efd8..b7d09785278a4 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -54,7 +54,7 @@ use arrow::array::{ }; use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::compute::kernels::cmp::eq; -use arrow::compute::{self, FilterBuilder, and, take}; +use arrow::compute::{self, FilterBuilder, take}; use arrow::datatypes::{ ArrowNativeType, Field, Schema, SchemaBuilder, UInt32Type, UInt64Type, }; @@ -1788,19 +1788,26 @@ pub(super) fn equal_rows_arr( let arr_left = take(first_left.as_ref(), indices_left, None)?; let arr_right = take(first_right.as_ref(), indices_right, None)?; - let mut equal: BooleanArray = eq_dyn_null(&arr_left, &arr_right, null_equality)?; + let first_eq = eq_dyn_null(&arr_left, &arr_right, null_equality)?; - // Use map and try_fold to iterate over the remaining pairs of arrays. - // In each iteration, take is used on the pair of arrays and their equality is determined. - // The results are then folded (combined) using the and function to get a final equality result. - equal = iter - .map(|(left, right)| { - let arr_left = take(left.as_ref(), indices_left, None)?; - let arr_right = take(right.as_ref(), indices_right, None)?; - eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), null_equality) - }) - .try_fold(equal, |acc, equal2| and(&acc, &equal2?))?; + // Accumulate equality results using BooleanBuffer bitwise AND, treating null as false. + // This avoids allocating intermediate BooleanArrays with null handling. + let to_filter_buf = |arr: &BooleanArray| -> BooleanBuffer { + match arr.nulls() { + Some(nulls) => arr.values() & nulls.inner(), + None => arr.values().clone(), + } + }; + + let mut equal_buf = to_filter_buf(&first_eq); + for (left, right) in iter { + let arr_left = take(left.as_ref(), indices_left, None)?; + let arr_right = take(right.as_ref(), indices_right, None)?; + let eq_result = eq_dyn_null(&arr_left, &arr_right, null_equality)?; + equal_buf &= &to_filter_buf(&eq_result); + } + let equal = BooleanArray::new(equal_buf, None); let filter_builder = FilterBuilder::new(&equal).optimize().build(); let left_filtered = filter_builder.filter(indices_left)?;