Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 2 additions & 99 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
jvm_bridge::JVMClasses,
};
use arrow::array::{make_array, ArrayData, ArrayRef, RecordBatch, RecordBatchOptions};
use arrow::compute::{cast_with_options, take, CastOptions};
use arrow::compute::{cast_with_options, CastOptions};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::ffi::FFI_ArrowArray;
use arrow::ffi::FFI_ArrowSchema;
Expand Down Expand Up @@ -183,12 +183,6 @@ impl ScanExec {
return Ok(InputBatch::EOF);
}

// Check for selection vectors and get selection indices if needed from
// JVM via FFI
// Selection vectors can be provided by, for instance, Iceberg to
// remove rows that have been deleted.
let selection_indices_arrays = Self::get_selection_indices(env, iter, num_cols)?;

// fetch batch data from JVM via FFI
let (num_rows, array_addrs, schema_addrs) =
Self::allocate_and_fetch_batch(env, iter, num_cols)?;
Expand All @@ -206,22 +200,6 @@ impl ScanExec {

let array = make_array(array_data);

// Apply selection if selection vectors exist (applies to all columns)
let array = if let Some(ref selection_arrays) = selection_indices_arrays {
let indices = &selection_arrays[i];
// Apply the selection using Arrow's take kernel
match take(&*array, &**indices, None) {
Ok(selected_array) => selected_array,
Err(e) => {
return Err(CometError::from(ExecutionError::ArrowError(format!(
"Failed to apply selection for column {i}: {e}",
))));
}
}
} else {
array
};

let array = if arrow_ffi_safe {
// ownership of this array has been transferred to native
// but we still need to unpack dictionary arrays
Expand All @@ -241,19 +219,7 @@ impl ScanExec {
}
}

// If selection was applied, determine the actual row count from the selected arrays
let actual_num_rows = if let Some(ref selection_arrays) = selection_indices_arrays {
if !selection_arrays.is_empty() {
// Use the length of the first selection array as the actual row count
selection_arrays[0].len()
} else {
num_rows as usize
}
} else {
num_rows as usize
};

Ok(InputBatch::new(inputs, Some(actual_num_rows)))
Ok(InputBatch::new(inputs, Some(num_rows as usize)))
})
}

Expand Down Expand Up @@ -303,69 +269,6 @@ impl ScanExec {

Ok((num_rows, array_addrs, schema_addrs))
}

/// Checks for selection vectors and exports selection indices if needed.
/// Returns selection arrays if they exist (applies to all columns).
fn get_selection_indices(
env: &mut jni::Env,
iter: &JObject,
num_cols: usize,
) -> Result<Option<Vec<ArrayRef>>, CometError> {
// Check if all columns have selection vectors
let has_selection_vectors_result: jni::sys::jboolean = unsafe {
jni_call!(env,
comet_batch_iterator(iter).has_selection_vectors() -> jni::sys::jboolean)?
};
let has_selection_vectors = has_selection_vectors_result;

let selection_indices_arrays = if has_selection_vectors {
// Allocate arrays for selection indices export (one per column)
let mut indices_array_addrs = Vec::with_capacity(num_cols);
let mut indices_schema_addrs = Vec::with_capacity(num_cols);

for _ in 0..num_cols {
let arrow_array = Rc::new(FFI_ArrowArray::empty());
let arrow_schema = Rc::new(FFI_ArrowSchema::empty());
indices_array_addrs.push(Rc::into_raw(arrow_array) as i64);
indices_schema_addrs.push(Rc::into_raw(arrow_schema) as i64);
}

// Prepare JNI arrays for the export call
let indices_array_obj = env.new_long_array(num_cols)?;
let indices_schema_obj = env.new_long_array(num_cols)?;
indices_array_obj.set_region(env, 0, &indices_array_addrs)?;
indices_schema_obj.set_region(env, 0, &indices_schema_addrs)?;

// Export selection indices from JVM
let _exported_count: i32 = unsafe {
jni_call!(env,
comet_batch_iterator(iter).export_selection_indices(
JValue::Object(JObject::from(indices_array_obj).as_ref()),
JValue::Object(JObject::from(indices_schema_obj).as_ref())
) -> i32)?
};

// Convert to ArrayRef for easier handling
let mut selection_arrays = Vec::with_capacity(num_cols);
for i in 0..num_cols {
let array_data =
ArrayData::from_spark((indices_array_addrs[i], indices_schema_addrs[i]))?;
selection_arrays.push(make_array(array_data));

// Drop the references to the FFI arrays
unsafe {
Rc::from_raw(indices_array_addrs[i] as *const FFI_ArrowArray);
Rc::from_raw(indices_schema_addrs[i] as *const FFI_ArrowSchema);
}
}

Some(selection_arrays)
} else {
None
};

Ok(selection_indices_arrays)
}
}

fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef {
Expand Down
16 changes: 0 additions & 16 deletions native/jni-bridge/src/batch_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ pub struct CometBatchIterator<'a> {
pub method_has_next_ret: ReturnType,
pub method_next: JMethodID,
pub method_next_ret: ReturnType,
pub method_has_selection_vectors: JMethodID,
pub method_has_selection_vectors_ret: ReturnType,
pub method_export_selection_indices: JMethodID,
pub method_export_selection_indices_ret: ReturnType,
}

impl<'a> CometBatchIterator<'a> {
Expand All @@ -58,18 +54,6 @@ impl<'a> CometBatchIterator<'a> {
jni::jni_sig!("([J[J)I"),
)?,
method_next_ret: ReturnType::Primitive(Primitive::Int),
method_has_selection_vectors: env.get_method_id(
JNIString::new(Self::JVM_CLASS),
jni::jni_str!("hasSelectionVectors"),
jni::jni_sig!("()Z"),
)?,
method_has_selection_vectors_ret: ReturnType::Primitive(Primitive::Boolean),
method_export_selection_indices: env.get_method_id(
JNIString::new(Self::JVM_CLASS),
jni::jni_str!("exportSelectionIndices"),
jni::jni_sig!("([J[J)I"),
)?,
method_export_selection_indices_ret: ReturnType::Primitive(Primitive::Int),
})
}
}
46 changes: 0 additions & 46 deletions spark/src/main/java/org/apache/comet/CometBatchIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.spark.sql.vectorized.ColumnarBatch;

import org.apache.comet.vector.CometSelectionVector;
import org.apache.comet.vector.NativeUtil;

/**
Expand Down Expand Up @@ -91,49 +90,4 @@ public int next(long[] arrayAddrs, long[] schemaAddrs) {

return numRows;
}

/**
* Check if the current batch has selection vectors for all columns.
*
* @return true if all columns are CometSelectionVector instances, false otherwise
*/
public boolean hasSelectionVectors() {
if (currentBatch == null) {
return false;
}

// Check if all columns are CometSelectionVector instances
for (int i = 0; i < currentBatch.numCols(); i++) {
if (!(currentBatch.column(i) instanceof CometSelectionVector)) {
return false;
}
}
return true;
}

/**
* Export selection indices for all columns when they are selection vectors.
*
* @param arrayAddrs The addresses of the ArrowArray structures for indices
* @param schemaAddrs The addresses of the ArrowSchema structures for indices
* @return Number of selection indices arrays exported
*/
public int exportSelectionIndices(long[] arrayAddrs, long[] schemaAddrs) {
if (currentBatch == null) {
return 0;
}

int exportCount = 0;
for (int i = 0; i < currentBatch.numCols(); i++) {
if (currentBatch.column(i) instanceof CometSelectionVector) {
CometSelectionVector selectionVector = (CometSelectionVector) currentBatch.column(i);

// Export the indices vector
nativeUtil.exportSingleVector(
selectionVector.getIndices(), arrayAddrs[exportCount], schemaAddrs[exportCount]);
exportCount++;
}
}
return exportCount;
}
}
41 changes: 11 additions & 30 deletions spark/src/main/java/org/apache/comet/vector/CometDecodedVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,29 @@

package org.apache.comet.vector;

import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.spark.sql.comet.util.Utils;
import org.apache.spark.unsafe.Platform;

/** A Comet vector whose elements are already decoded (i.e., materialized). */
/**
* A Comet vector backed by a single Arrow {@link ValueVector} whose values are already decoded (vs.
* {@link CometDictionaryVector}, which keeps indices and dictionary values in separate vectors and
* decodes on access).
*
* <p>Caches the most recently read validity byte to amortize null checks during the common
* sequential row-access pattern.
*/
public abstract class CometDecodedVector extends CometVector {
/**
* The vector that stores all the values. For dictionary-backed vector, this is the vector of
* indices.
*/
protected final ValueVector valueVector;

private boolean hasNull;
private int numNulls;
private int numValues;
private final boolean hasNull;
private final int numNulls;
private final int numValues;
private int validityByteCacheIndex = -1;
private byte validityByteCache;
protected boolean isUuid;
Expand All @@ -58,31 +64,6 @@ public ValueVector getValueVector() {
return valueVector;
}

@Override
public void setNumNulls(int numNulls) {
// We don't need to update null count in 'valueVector' since 'ValueVector.getNullCount' will
// re-compute the null count from validity buffer.
this.numNulls = numNulls;
this.hasNull = numNulls != 0;
this.validityByteCacheIndex = -1;
}

@Override
public void setNumValues(int numValues) {
this.numValues = numValues;
if (valueVector instanceof BaseVariableWidthVector) {
BaseVariableWidthVector bv = (BaseVariableWidthVector) valueVector;
// In case `lastSet` is smaller than `numValues`, `setValueCount` will set all the offsets
// within `[lastSet + 1, numValues)` to be empty, which is incorrect in our case.
//
// For instance, this can happen if one first call `setNumValues` with input 100, and then
// again `setNumValues` with 200. The first call will set `lastSet` to 99, while the second
// call will set all strings between indices `[100, 200)` to be empty.
bv.setLastSet(numValues);
}
valueVector.setValueCount(numValues);
}

public int numValues() {
return numValues;
}
Expand Down
Loading
Loading