Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions datafusion/catalog/src/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use crate::TableProvider;
use arrow::array::{
Array, ArrayRef, BooleanArray, RecordBatch as ArrowRecordBatch, UInt64Array,
};
use arrow::buffer::BooleanBuffer;
use arrow::compute::filter_record_batch;
use arrow::compute::kernels::zip::zip;
use arrow::compute::{and, filter_record_batch};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::error::Result;
Expand Down Expand Up @@ -515,31 +516,37 @@ fn evaluate_filters_to_mask(
return Ok(None);
}

let mut combined_mask: Option<BooleanArray> = None;
let mut combined_buf: Option<BooleanBuffer> = None;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about implementing this API:

We could then use it for these cases 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it is a good idea!
I don't see yet any real e2e improvements so closing this one for now :)


for filter_expr in filters {
let physical_expr =
create_physical_expr(filter_expr, df_schema, execution_props)?;

let result = physical_expr.evaluate(batch)?;
let array = result.into_array(batch.num_rows())?;
let bool_array = array
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| {
datafusion_common::DataFusionError::Internal(
"Filter did not evaluate to boolean".to_string(),
)
})?
.clone();

combined_mask = Some(match combined_mask {
Some(existing) => and(&existing, &bool_array)?,
None => bool_array,
});
let bool_array =
array
.as_any()
.downcast_ref::<BooleanArray>()
.ok_or_else(|| {
datafusion_common::DataFusionError::Internal(
"Filter did not evaluate to boolean".to_string(),
)
})?;

// Convert to BooleanBuffer, treating null as false
let buf = match bool_array.nulls() {
Some(nulls) => bool_array.values() & nulls.inner(),
None => bool_array.values().clone(),
};

match &mut combined_buf {
Some(existing) => *existing &= &buf,
None => combined_buf = Some(buf),
}
}

Ok(combined_mask)
Ok(combined_buf.map(|buf| BooleanArray::new(buf, None)))
}

/// Returns a single row with the count of affected rows.
Expand Down
13 changes: 10 additions & 3 deletions datafusion/datasource-parquet/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::{
ObjectStoreFetch, apply_file_schema_type_coercions, coerce_int96_to_resolution,
};
use arrow::array::{Array, ArrayRef, BooleanArray};
use arrow::compute::and;
use arrow::compute::kernels::cmp::eq;
use arrow::compute::sum;
use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit};
Expand Down Expand Up @@ -649,8 +648,16 @@ fn has_any_exact_match(

let scalar_array = value.to_scalar().ok()?;
let eq_mask = eq(&scalar_array, &array).ok()?;
let combined_mask = and(&eq_mask, exactness).ok()?;
Some(combined_mask.true_count() > 0)
// Combine the two masks using BooleanBuffer bitwise AND, treating null as false
let eq_buf = match eq_mask.nulls() {
Some(nulls) => eq_mask.values() & nulls.inner(),
None => eq_mask.values().clone(),
};
let exact_buf = match exactness.nulls() {
Some(nulls) => exactness.values() & nulls.inner(),
None => exactness.values().clone(),
};
Some(BooleanArray::new(&eq_buf & &exact_buf, None).has_true())
}

/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use std::hash::Hash;
use std::mem::size_of_val;
use std::sync::Arc;

use arrow::compute::{and, filter, is_not_null};
use arrow::array::BooleanArray;
use arrow::compute::{filter, is_not_null};
use arrow::datatypes::FieldRef;
use arrow::{array::ArrayRef, datatypes::DataType};
use datafusion_common::ScalarValue;
Expand Down Expand Up @@ -280,7 +281,10 @@ impl Accumulator for ApproxPercentileWithWeightAccumulator {
match (means.null_count() > 0, weights.null_count() > 0) {
// Both have nulls
(true, true) => {
let predicate = and(&is_not_null(&means)?, &is_not_null(&weights)?)?;
let predicate = BooleanArray::new(
is_not_null(&means)?.values() & is_not_null(&weights)?.values(),
None,
);
means = filter(&means, &predicate)?;
weights = filter(&weights, &predicate)?;
}
Expand Down
12 changes: 9 additions & 3 deletions datafusion/functions-aggregate/src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::array::{
Array, AsArray, BooleanArray, Float64Array, NullBufferBuilder, UInt64Array,
downcast_array,
};
use arrow::compute::{and, filter, is_not_null};
use arrow::compute::{filter, is_not_null};
use arrow::datatypes::{FieldRef, Float64Type, UInt64Type};
use arrow::{
array::ArrayRef,
Expand Down Expand Up @@ -174,7 +174,10 @@ impl Accumulator for CorrelationAccumulator {
// calculation logic in children accumulators, and calling only
// calculation part from Correlation
let values = if values[0].null_count() != 0 || values[1].null_count() != 0 {
let mask = and(&is_not_null(&values[0])?, &is_not_null(&values[1])?)?;
let mask = BooleanArray::new(
is_not_null(&values[0])?.values() & is_not_null(&values[1])?.values(),
None,
);
let values1 = filter(&values[0], &mask)?;
let values2 = filter(&values[1], &mask)?;

Expand Down Expand Up @@ -267,7 +270,10 @@ impl Accumulator for CorrelationAccumulator {

fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = if values[0].null_count() != 0 || values[1].null_count() != 0 {
let mask = and(&is_not_null(&values[0])?, &is_not_null(&values[1])?)?;
let mask = BooleanArray::new(
is_not_null(&values[0])?.values() & is_not_null(&values[1])?.values(),
None,
);
let values1 = filter(&values[0], &mask)?;
let values2 = filter(&values[1], &mask)?;

Expand Down
31 changes: 19 additions & 12 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use arrow::array::{
};
use arrow::buffer::{BooleanBuffer, NullBuffer};
use arrow::compute::kernels::cmp::eq;
use arrow::compute::{self, FilterBuilder, and, take};
use arrow::compute::{self, FilterBuilder, take};
use arrow::datatypes::{
ArrowNativeType, Field, Schema, SchemaBuilder, UInt32Type, UInt64Type,
};
Expand Down Expand Up @@ -1788,19 +1788,26 @@ pub(super) fn equal_rows_arr(
let arr_left = take(first_left.as_ref(), indices_left, None)?;
let arr_right = take(first_right.as_ref(), indices_right, None)?;

let mut equal: BooleanArray = eq_dyn_null(&arr_left, &arr_right, null_equality)?;
let first_eq = eq_dyn_null(&arr_left, &arr_right, null_equality)?;

// Use map and try_fold to iterate over the remaining pairs of arrays.
// In each iteration, take is used on the pair of arrays and their equality is determined.
// The results are then folded (combined) using the and function to get a final equality result.
equal = iter
.map(|(left, right)| {
let arr_left = take(left.as_ref(), indices_left, None)?;
let arr_right = take(right.as_ref(), indices_right, None)?;
eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), null_equality)
})
.try_fold(equal, |acc, equal2| and(&acc, &equal2?))?;
// Accumulate equality results using BooleanBuffer bitwise AND, treating null as false.
// This avoids allocating intermediate BooleanArrays with null handling.
let to_filter_buf = |arr: &BooleanArray| -> BooleanBuffer {
match arr.nulls() {
Some(nulls) => arr.values() & nulls.inner(),
None => arr.values().clone(),
}
};

let mut equal_buf = to_filter_buf(&first_eq);
for (left, right) in iter {
let arr_left = take(left.as_ref(), indices_left, None)?;
let arr_right = take(right.as_ref(), indices_right, None)?;
let eq_result = eq_dyn_null(&arr_left, &arr_right, null_equality)?;
equal_buf &= &to_filter_buf(&eq_result);
}

let equal = BooleanArray::new(equal_buf, None);
let filter_builder = FilterBuilder::new(&equal).optimize().build();

let left_filtered = filter_builder.filter(indices_left)?;
Expand Down
Loading