From 050221b35e752633e5fd5ad46761b01aaf646213 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 23 May 2026 09:23:11 -0600 Subject: [PATCH] chore: remove dead vector classes left over from native_iceberg_compat removal CometSelectionVector and CometDelegateVector became unreachable when the native_iceberg_compat JVM Parquet reader was removed (7203fb596, 0fd52b72d). Nothing on the live execution path constructs them, so the matching JNI methods, codegen branches, and vector-mutation API are also dead. - Delete CometSelectionVector and CometDelegateVector - Remove hasSelectionVectors/exportSelectionIndices JNI methods and the get_selection_indices branch in the native ScanExec::get_next path - Remove the now-orphaned setNumNulls/setNumValues API from CometVector, CometDecodedVector, and CometPlainVector - Remove the isReused/setReused field and accessors on CometPlainVector (no constructor sets it to true and no caller invokes setReused), and simplify the SPARK-50235 cleanup loop in CometColumnarToRowExec to always close per-batch vectors - Add class-level documentation to the remaining vector classes explaining why a custom hierarchy is needed (Arrow shading, FFI lifecycle, lazy dictionary decoding) --- native/core/src/execution/operators/scan.rs | 101 +------ native/jni-bridge/src/batch_iterator.rs | 16 - .../org/apache/comet/CometBatchIterator.java | 46 --- .../comet/vector/CometDecodedVector.java | 41 +-- .../comet/vector/CometDelegateVector.java | 170 ----------- .../apache/comet/vector/CometDictionary.java | 9 +- .../comet/vector/CometDictionaryVector.java | 12 +- .../apache/comet/vector/CometListVector.java | 5 +- .../apache/comet/vector/CometMapVector.java | 6 +- .../apache/comet/vector/CometPlainVector.java | 29 +- .../comet/vector/CometSelectionVector.java | 277 ------------------ .../comet/vector/CometStructVector.java | 6 +- .../org/apache/comet/vector/CometVector.java | 31 +- .../org/apache/comet/vector/NativeUtil.scala | 59 ---- .../sql/comet/CometColumnarToRowExec.scala | 13 +- 15 files changed, 74 insertions(+), 747 deletions(-) delete mode 100644 spark/src/main/java/org/apache/comet/vector/CometDelegateVector.java delete mode 100644 spark/src/main/java/org/apache/comet/vector/CometSelectionVector.java diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 90bb741b5e..e318d9e66b 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -24,7 +24,7 @@ use crate::{ jvm_bridge::JVMClasses, }; use arrow::array::{make_array, ArrayData, ArrayRef, RecordBatch, RecordBatchOptions}; -use arrow::compute::{cast_with_options, take, CastOptions}; +use arrow::compute::{cast_with_options, CastOptions}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::ffi::FFI_ArrowArray; use arrow::ffi::FFI_ArrowSchema; @@ -183,12 +183,6 @@ impl ScanExec { return Ok(InputBatch::EOF); } - // Check for selection vectors and get selection indices if needed from - // JVM via FFI - // Selection vectors can be provided by, for instance, Iceberg to - // remove rows that have been deleted. - let selection_indices_arrays = Self::get_selection_indices(env, iter, num_cols)?; - // fetch batch data from JVM via FFI let (num_rows, array_addrs, schema_addrs) = Self::allocate_and_fetch_batch(env, iter, num_cols)?; @@ -206,22 +200,6 @@ impl ScanExec { let array = make_array(array_data); - // Apply selection if selection vectors exist (applies to all columns) - let array = if let Some(ref selection_arrays) = selection_indices_arrays { - let indices = &selection_arrays[i]; - // Apply the selection using Arrow's take kernel - match take(&*array, &**indices, None) { - Ok(selected_array) => selected_array, - Err(e) => { - return Err(CometError::from(ExecutionError::ArrowError(format!( - "Failed to apply selection for column {i}: {e}", - )))); - } - } - } else { - array - }; - let array = if arrow_ffi_safe { // ownership of this array has been transferred to native // but we still need to unpack dictionary arrays @@ -241,19 +219,7 @@ impl ScanExec { } } - // If selection was applied, determine the actual row count from the selected arrays - let actual_num_rows = if let Some(ref selection_arrays) = selection_indices_arrays { - if !selection_arrays.is_empty() { - // Use the length of the first selection array as the actual row count - selection_arrays[0].len() - } else { - num_rows as usize - } - } else { - num_rows as usize - }; - - Ok(InputBatch::new(inputs, Some(actual_num_rows))) + Ok(InputBatch::new(inputs, Some(num_rows as usize))) }) } @@ -303,69 +269,6 @@ impl ScanExec { Ok((num_rows, array_addrs, schema_addrs)) } - - /// Checks for selection vectors and exports selection indices if needed. - /// Returns selection arrays if they exist (applies to all columns). - fn get_selection_indices( - env: &mut jni::Env, - iter: &JObject, - num_cols: usize, - ) -> Result>, CometError> { - // Check if all columns have selection vectors - let has_selection_vectors_result: jni::sys::jboolean = unsafe { - jni_call!(env, - comet_batch_iterator(iter).has_selection_vectors() -> jni::sys::jboolean)? - }; - let has_selection_vectors = has_selection_vectors_result; - - let selection_indices_arrays = if has_selection_vectors { - // Allocate arrays for selection indices export (one per column) - let mut indices_array_addrs = Vec::with_capacity(num_cols); - let mut indices_schema_addrs = Vec::with_capacity(num_cols); - - for _ in 0..num_cols { - let arrow_array = Rc::new(FFI_ArrowArray::empty()); - let arrow_schema = Rc::new(FFI_ArrowSchema::empty()); - indices_array_addrs.push(Rc::into_raw(arrow_array) as i64); - indices_schema_addrs.push(Rc::into_raw(arrow_schema) as i64); - } - - // Prepare JNI arrays for the export call - let indices_array_obj = env.new_long_array(num_cols)?; - let indices_schema_obj = env.new_long_array(num_cols)?; - indices_array_obj.set_region(env, 0, &indices_array_addrs)?; - indices_schema_obj.set_region(env, 0, &indices_schema_addrs)?; - - // Export selection indices from JVM - let _exported_count: i32 = unsafe { - jni_call!(env, - comet_batch_iterator(iter).export_selection_indices( - JValue::Object(JObject::from(indices_array_obj).as_ref()), - JValue::Object(JObject::from(indices_schema_obj).as_ref()) - ) -> i32)? - }; - - // Convert to ArrayRef for easier handling - let mut selection_arrays = Vec::with_capacity(num_cols); - for i in 0..num_cols { - let array_data = - ArrayData::from_spark((indices_array_addrs[i], indices_schema_addrs[i]))?; - selection_arrays.push(make_array(array_data)); - - // Drop the references to the FFI arrays - unsafe { - Rc::from_raw(indices_array_addrs[i] as *const FFI_ArrowArray); - Rc::from_raw(indices_schema_addrs[i] as *const FFI_ArrowSchema); - } - } - - Some(selection_arrays) - } else { - None - }; - - Ok(selection_indices_arrays) - } } fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef { diff --git a/native/jni-bridge/src/batch_iterator.rs b/native/jni-bridge/src/batch_iterator.rs index 65ca7e7d11..addda133fa 100644 --- a/native/jni-bridge/src/batch_iterator.rs +++ b/native/jni-bridge/src/batch_iterator.rs @@ -32,10 +32,6 @@ pub struct CometBatchIterator<'a> { pub method_has_next_ret: ReturnType, pub method_next: JMethodID, pub method_next_ret: ReturnType, - pub method_has_selection_vectors: JMethodID, - pub method_has_selection_vectors_ret: ReturnType, - pub method_export_selection_indices: JMethodID, - pub method_export_selection_indices_ret: ReturnType, } impl<'a> CometBatchIterator<'a> { @@ -58,18 +54,6 @@ impl<'a> CometBatchIterator<'a> { jni::jni_sig!("([J[J)I"), )?, method_next_ret: ReturnType::Primitive(Primitive::Int), - method_has_selection_vectors: env.get_method_id( - JNIString::new(Self::JVM_CLASS), - jni::jni_str!("hasSelectionVectors"), - jni::jni_sig!("()Z"), - )?, - method_has_selection_vectors_ret: ReturnType::Primitive(Primitive::Boolean), - method_export_selection_indices: env.get_method_id( - JNIString::new(Self::JVM_CLASS), - jni::jni_str!("exportSelectionIndices"), - jni::jni_sig!("([J[J)I"), - )?, - method_export_selection_indices_ret: ReturnType::Primitive(Primitive::Int), }) } } diff --git a/spark/src/main/java/org/apache/comet/CometBatchIterator.java b/spark/src/main/java/org/apache/comet/CometBatchIterator.java index 4f45f98a6b..9b48a47c57 100644 --- a/spark/src/main/java/org/apache/comet/CometBatchIterator.java +++ b/spark/src/main/java/org/apache/comet/CometBatchIterator.java @@ -23,7 +23,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; -import org.apache.comet.vector.CometSelectionVector; import org.apache.comet.vector.NativeUtil; /** @@ -91,49 +90,4 @@ public int next(long[] arrayAddrs, long[] schemaAddrs) { return numRows; } - - /** - * Check if the current batch has selection vectors for all columns. - * - * @return true if all columns are CometSelectionVector instances, false otherwise - */ - public boolean hasSelectionVectors() { - if (currentBatch == null) { - return false; - } - - // Check if all columns are CometSelectionVector instances - for (int i = 0; i < currentBatch.numCols(); i++) { - if (!(currentBatch.column(i) instanceof CometSelectionVector)) { - return false; - } - } - return true; - } - - /** - * Export selection indices for all columns when they are selection vectors. - * - * @param arrayAddrs The addresses of the ArrowArray structures for indices - * @param schemaAddrs The addresses of the ArrowSchema structures for indices - * @return Number of selection indices arrays exported - */ - public int exportSelectionIndices(long[] arrayAddrs, long[] schemaAddrs) { - if (currentBatch == null) { - return 0; - } - - int exportCount = 0; - for (int i = 0; i < currentBatch.numCols(); i++) { - if (currentBatch.column(i) instanceof CometSelectionVector) { - CometSelectionVector selectionVector = (CometSelectionVector) currentBatch.column(i); - - // Export the indices vector - nativeUtil.exportSingleVector( - selectionVector.getIndices(), arrayAddrs[exportCount], schemaAddrs[exportCount]); - exportCount++; - } - } - return exportCount; - } } diff --git a/spark/src/main/java/org/apache/comet/vector/CometDecodedVector.java b/spark/src/main/java/org/apache/comet/vector/CometDecodedVector.java index a37683df2c..157184a0c2 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometDecodedVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometDecodedVector.java @@ -19,13 +19,19 @@ package org.apache.comet.vector; -import org.apache.arrow.vector.BaseVariableWidthVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.types.pojo.Field; import org.apache.spark.sql.comet.util.Utils; import org.apache.spark.unsafe.Platform; -/** A Comet vector whose elements are already decoded (i.e., materialized). */ +/** + * A Comet vector backed by a single Arrow {@link ValueVector} whose values are already decoded (vs. + * {@link CometDictionaryVector}, which keeps indices and dictionary values in separate vectors and + * decodes on access). + * + *

Caches the most recently read validity byte to amortize null checks during the common + * sequential row-access pattern. + */ public abstract class CometDecodedVector extends CometVector { /** * The vector that stores all the values. For dictionary-backed vector, this is the vector of @@ -33,9 +39,9 @@ public abstract class CometDecodedVector extends CometVector { */ protected final ValueVector valueVector; - private boolean hasNull; - private int numNulls; - private int numValues; + private final boolean hasNull; + private final int numNulls; + private final int numValues; private int validityByteCacheIndex = -1; private byte validityByteCache; protected boolean isUuid; @@ -58,31 +64,6 @@ public ValueVector getValueVector() { return valueVector; } - @Override - public void setNumNulls(int numNulls) { - // We don't need to update null count in 'valueVector' since 'ValueVector.getNullCount' will - // re-compute the null count from validity buffer. - this.numNulls = numNulls; - this.hasNull = numNulls != 0; - this.validityByteCacheIndex = -1; - } - - @Override - public void setNumValues(int numValues) { - this.numValues = numValues; - if (valueVector instanceof BaseVariableWidthVector) { - BaseVariableWidthVector bv = (BaseVariableWidthVector) valueVector; - // In case `lastSet` is smaller than `numValues`, `setValueCount` will set all the offsets - // within `[lastSet + 1, numValues)` to be empty, which is incorrect in our case. - // - // For instance, this can happen if one first call `setNumValues` with input 100, and then - // again `setNumValues` with 200. The first call will set `lastSet` to 99, while the second - // call will set all strings between indices `[100, 200)` to be empty. - bv.setLastSet(numValues); - } - valueVector.setValueCount(numValues); - } - public int numValues() { return numValues; } diff --git a/spark/src/main/java/org/apache/comet/vector/CometDelegateVector.java b/spark/src/main/java/org/apache/comet/vector/CometDelegateVector.java deleted file mode 100644 index 287408796f..0000000000 --- a/spark/src/main/java/org/apache/comet/vector/CometDelegateVector.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.vector; - -import org.apache.arrow.vector.ValueVector; -import org.apache.arrow.vector.dictionary.DictionaryProvider; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.vectorized.ColumnVector; -import org.apache.spark.sql.vectorized.ColumnarArray; -import org.apache.spark.sql.vectorized.ColumnarMap; -import org.apache.spark.unsafe.types.UTF8String; - -/** A special Comet vector that just delegate all calls */ -public class CometDelegateVector extends CometVector { - protected CometVector delegate; - - public CometDelegateVector(DataType dataType) { - this(dataType, null); - } - - public CometDelegateVector(DataType dataType, CometVector delegate) { - super(dataType); - if (delegate instanceof CometDelegateVector) { - throw new IllegalArgumentException("cannot have nested delegation"); - } - this.delegate = delegate; - } - - protected void setDelegate(CometVector delegate) { - this.delegate = delegate; - } - - @Override - public void setNumNulls(int numNulls) { - delegate.setNumNulls(numNulls); - } - - @Override - public void setNumValues(int numValues) { - delegate.setNumValues(numValues); - } - - @Override - public int numValues() { - return delegate.numValues(); - } - - @Override - public boolean hasNull() { - return delegate.hasNull(); - } - - @Override - public int numNulls() { - return delegate.numNulls(); - } - - @Override - public boolean isNullAt(int rowId) { - return delegate.isNullAt(rowId); - } - - @Override - public boolean getBoolean(int rowId) { - return delegate.getBoolean(rowId); - } - - @Override - public byte getByte(int rowId) { - return delegate.getByte(rowId); - } - - @Override - public short getShort(int rowId) { - return delegate.getShort(rowId); - } - - @Override - public int getInt(int rowId) { - return delegate.getInt(rowId); - } - - @Override - public long getLong(int rowId) { - return delegate.getLong(rowId); - } - - @Override - public long getLongDecimal(int rowId) { - return delegate.getLongDecimal(rowId); - } - - @Override - public float getFloat(int rowId) { - return delegate.getFloat(rowId); - } - - @Override - public double getDouble(int rowId) { - return delegate.getDouble(rowId); - } - - @Override - public Decimal getDecimal(int i, int precision, int scale) { - return delegate.getDecimal(i, precision, scale); - } - - @Override - byte[] getBinaryDecimal(int i) { - return delegate.getBinaryDecimal(i); - } - - @Override - public UTF8String getUTF8String(int rowId) { - return delegate.getUTF8String(rowId); - } - - @Override - public byte[] getBinary(int rowId) { - return delegate.getBinary(rowId); - } - - @Override - public ColumnarArray getArray(int i) { - return delegate.getArray(i); - } - - @Override - public ColumnarMap getMap(int i) { - return delegate.getMap(i); - } - - @Override - public ColumnVector getChild(int i) { - return delegate.getChild(i); - } - - @Override - public ValueVector getValueVector() { - return delegate.getValueVector(); - } - - @Override - public CometVector slice(int offset, int length) { - return delegate.slice(offset, length); - } - - @Override - public DictionaryProvider getDictionaryProvider() { - return delegate.getDictionaryProvider(); - } -} diff --git a/spark/src/main/java/org/apache/comet/vector/CometDictionary.java b/spark/src/main/java/org/apache/comet/vector/CometDictionary.java index cacfbada8c..da3b15770f 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometDictionary.java +++ b/spark/src/main/java/org/apache/comet/vector/CometDictionary.java @@ -22,7 +22,14 @@ import org.apache.arrow.vector.ValueVector; import org.apache.spark.unsafe.types.UTF8String; -/** A dictionary which maps indices (integers) to values. */ +/** + * Decoded dictionary values referenced by {@link CometDictionaryVector}. + * + *

Wraps the dictionary's {@link CometPlainVector} of values and exposes per-type scalar + * accessors that callers can index by decoded dictionary ID. For decimals, the raw big-endian Arrow + * bytes are reversed and copied into a per-ID cache so that repeated random-access reads do not + * redo the conversion. + */ public class CometDictionary implements AutoCloseable { private static final int DECIMAL_BYTE_WIDTH = 16; diff --git a/spark/src/main/java/org/apache/comet/vector/CometDictionaryVector.java b/spark/src/main/java/org/apache/comet/vector/CometDictionaryVector.java index 15f50b1de9..6f55b118a2 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometDictionaryVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometDictionaryVector.java @@ -25,7 +25,17 @@ import org.apache.parquet.Preconditions; import org.apache.spark.unsafe.types.UTF8String; -/** A column vector whose elements are dictionary-encoded. */ +/** + * A {@link CometDecodedVector} for dictionary-encoded columns, holding an indices vector plus a + * separate {@link CometDictionary} of values. + * + *

Required as a distinct subclass because Spark's {@link + * org.apache.spark.sql.vectorized.ColumnVector} contract exposes per-row scalar accessors (e.g. + * {@code getInt(i)}, {@code getUTF8String(i)}) that must lazily decode {@code values[indices[i]]}; + * Arrow's own {@code DictionaryEncoding} metadata alone does not provide this. Slicing produces a + * vector that aliases the same dictionary; lifecycle tracking via {@code isAlias} prevents + * double-closing the shared dictionary. + */ public class CometDictionaryVector extends CometDecodedVector { public final CometPlainVector indices; public final CometDictionary values; diff --git a/spark/src/main/java/org/apache/comet/vector/CometListVector.java b/spark/src/main/java/org/apache/comet/vector/CometListVector.java index f1e112ec1f..d228b6e549 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometListVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometListVector.java @@ -26,7 +26,10 @@ import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarArray; -/** A Comet column vector for list type. */ +/** + * A {@link CometDecodedVector} for Spark array columns, wrapping an Arrow {@link ListVector} and + * exposing each row as a Spark {@link ColumnarArray} backed by the list's data vector. + */ public class CometListVector extends CometDecodedVector { final ListVector listVector; final ValueVector dataVector; diff --git a/spark/src/main/java/org/apache/comet/vector/CometMapVector.java b/spark/src/main/java/org/apache/comet/vector/CometMapVector.java index 4627a4408e..7a884a077b 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometMapVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometMapVector.java @@ -27,7 +27,11 @@ import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarMap; -/** A Comet column vector for map type. */ +/** + * A {@link CometDecodedVector} for Spark map columns, wrapping an Arrow {@link MapVector} whose + * entries are a struct of {@code (key, value)} and exposing each row as a Spark {@link + * ColumnarMap}. + */ public class CometMapVector extends CometDecodedVector { final MapVector mapVector; final ValueVector dataVector; diff --git a/spark/src/main/java/org/apache/comet/vector/CometPlainVector.java b/spark/src/main/java/org/apache/comet/vector/CometPlainVector.java index d8c4130406..e42f648598 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometPlainVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometPlainVector.java @@ -30,7 +30,13 @@ import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.UTF8String; -/** A column vector whose elements are plainly decoded. */ +/** + * A {@link CometDecodedVector} for scalar (non-dictionary, non-complex) columns. + * + *

Reads values directly from the Arrow data buffer via {@link Platform} unsafe operations rather + * than Arrow's per-element getters. This is the hot path for native execution output. Optionally + * decodes 16-byte FixedSizeBinary values as Java {@link UUID}s when {@code isUuid} is set. + */ public class CometPlainVector extends CometDecodedVector { private final long valueBufferAddress; private final boolean isBaseFixedWidthVector; @@ -38,17 +44,11 @@ public class CometPlainVector extends CometDecodedVector { private byte booleanByteCache; private int booleanByteCacheIndex = -1; - private boolean isReused; - public CometPlainVector(ValueVector vector) { this(vector, false); } public CometPlainVector(ValueVector vector, boolean isUuid) { - this(vector, isUuid, false); - } - - public CometPlainVector(ValueVector vector, boolean isUuid, boolean isReused) { super(vector, vector.getField(), isUuid); // NullType doesn't have data buffer. if (vector instanceof NullVector) { @@ -58,21 +58,6 @@ public CometPlainVector(ValueVector vector, boolean isUuid, boolean isReused) { } isBaseFixedWidthVector = valueVector instanceof BaseFixedWidthVector; - this.isReused = isReused; - } - - public boolean isReused() { - return isReused; - } - - public void setReused(boolean isReused) { - this.isReused = isReused; - } - - @Override - public void setNumNulls(int numNulls) { - super.setNumNulls(numNulls); - this.booleanByteCacheIndex = -1; } @Override diff --git a/spark/src/main/java/org/apache/comet/vector/CometSelectionVector.java b/spark/src/main/java/org/apache/comet/vector/CometSelectionVector.java deleted file mode 100644 index 72ce753d08..0000000000 --- a/spark/src/main/java/org/apache/comet/vector/CometSelectionVector.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.vector; - -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.IntVector; -import org.apache.arrow.vector.dictionary.DictionaryProvider; -import org.apache.spark.sql.vectorized.ColumnVector; -import org.apache.spark.sql.vectorized.ColumnarArray; -import org.apache.spark.sql.vectorized.ColumnarMap; -import org.apache.spark.unsafe.types.UTF8String; - -/** - * A zero-copy selection vector that extends CometVector. This implementation stores the original - * data vector and selection indices as separate CometVectors, providing zero copy access to the the - * underlying data. - * - *

If the original vector has values [v0, v1, v2, v3, v4, v5, v6, v7] and the selection indices - * are [0, 1, 3, 4, 5, 7], then this selection vector will logically represent [v0, v1, v3, v4, v5, - * v7] without actually copying the data. - * - *

Most of the implementations of CometVector methods are implemented for completeness. We don't - * use this class except to transfer the original data and the selection indices to the native code. - */ -public class CometSelectionVector extends CometVector { - /** The original vector containing all values */ - private final CometVector values; - - /** - * The valid indices in the values vector. This array is converted into an Arrow vector so we can - * transfer the data to native in one JNI call. This is used to represent the rowid mapping used - * by Iceberg - */ - private final int[] selectionIndices; - - /** - * The indices vector containing selection indices. This is currently allocated by the JVM side - * unlike the values vector which is allocated on the native side - */ - private final CometVector indices; - - /** - * Number of selected elements. The indices array may have a length greater than this but only - * numValues elements in the array are valid - */ - private final int numValues; - - /** - * Creates a new selection vector from the given vector and indices. - * - * @param values The original vector to select from - * @param indices The indices to select from the original vector - * @param numValues The number of valid values in the indices array - * @throws IllegalArgumentException if any index is out of bounds - */ - public CometSelectionVector(CometVector values, int[] indices, int numValues) { - super(values.dataType()); - - this.values = values; - this.selectionIndices = indices; - this.numValues = numValues; - - // Validate indices - int originalLength = values.numValues(); - for (int idx : indices) { - if (idx < 0 || idx >= originalLength) { - throw new IllegalArgumentException( - String.format( - "Index %d is out of bounds for vector of length %d", idx, originalLength)); - } - } - - // Create indices vector - BufferAllocator allocator = values.getValueVector().getAllocator(); - IntVector indicesVector = new IntVector("selection_indices", allocator); - indicesVector.allocateNew(numValues); - for (int i = 0; i < numValues; i++) { - indicesVector.set(i, indices[i]); - } - indicesVector.setValueCount(numValues); - - this.indices = CometVector.getVector(indicesVector, values.getDictionaryProvider()); - } - - /** - * Returns the index in the values vector for the given selection vector index. - * - * @param selectionIndex Index in the selection vector - * @return The corresponding index in the original vector - * @throws IndexOutOfBoundsException if selectionIndex is out of bounds - */ - private int getValuesIndex(int selectionIndex) { - if (selectionIndex < 0 || selectionIndex >= numValues) { - throw new IndexOutOfBoundsException( - String.format( - "Selection index %d is out of bounds for selection vector of length %d", - selectionIndex, numValues)); - } - return indices.getInt(selectionIndex); - } - - /** - * Returns a reference to the values vector. - * - * @return The CometVector containing the values - */ - public CometVector getValues() { - return values; - } - - /** - * Returns the indices vector. - * - * @return The CometVector containing the indices - */ - public CometVector getIndices() { - return indices; - } - - /** - * Returns the selected indices. - * - * @return Array of selected indices - */ - private int[] getSelectedIndices() { - return selectionIndices; - } - - @Override - public int numValues() { - return numValues; - } - - @Override - public void setNumValues(int numValues) { - // For selection vectors, we don't allow changing the number of values - // as it would break the mapping between selection indices and values - throw new UnsupportedOperationException("CometSelectionVector doesn't support setNumValues"); - } - - @Override - public void setNumNulls(int numNulls) { - // For selection vectors, null count should be delegated to the underlying values vector - // The selection doesn't change the null semantics - values.setNumNulls(numNulls); - } - - @Override - public boolean hasNull() { - return values.hasNull(); - } - - @Override - public int numNulls() { - return values.numNulls(); - } - - // ColumnVector method implementations - delegate to original vector with index mapping - @Override - public boolean isNullAt(int rowId) { - return values.isNullAt(getValuesIndex(rowId)); - } - - @Override - public boolean getBoolean(int rowId) { - return values.getBoolean(getValuesIndex(rowId)); - } - - @Override - public byte getByte(int rowId) { - return values.getByte(getValuesIndex(rowId)); - } - - @Override - public short getShort(int rowId) { - return values.getShort(getValuesIndex(rowId)); - } - - @Override - public int getInt(int rowId) { - return values.getInt(getValuesIndex(rowId)); - } - - @Override - public long getLong(int rowId) { - return values.getLong(getValuesIndex(rowId)); - } - - @Override - public long getLongDecimal(int rowId) { - return values.getLongDecimal(getValuesIndex(rowId)); - } - - @Override - public float getFloat(int rowId) { - return values.getFloat(getValuesIndex(rowId)); - } - - @Override - public double getDouble(int rowId) { - return values.getDouble(getValuesIndex(rowId)); - } - - @Override - public UTF8String getUTF8String(int rowId) { - return values.getUTF8String(getValuesIndex(rowId)); - } - - @Override - public byte[] getBinary(int rowId) { - return values.getBinary(getValuesIndex(rowId)); - } - - @Override - public ColumnarArray getArray(int rowId) { - return values.getArray(getValuesIndex(rowId)); - } - - @Override - public ColumnarMap getMap(int rowId) { - return values.getMap(getValuesIndex(rowId)); - } - - @Override - public ColumnVector getChild(int ordinal) { - // Return the child from the original vector - return values.getChild(ordinal); - } - - @Override - public DictionaryProvider getDictionaryProvider() { - return values.getDictionaryProvider(); - } - - @Override - public CometVector slice(int offset, int length) { - if (offset < 0 || length < 0 || offset + length > numValues) { - throw new IllegalArgumentException("Invalid slice parameters"); - } - // Get the current indices and slice them - int[] currentIndices = getSelectedIndices(); - int[] slicedIndices = new int[length]; - // This is not a very efficient version of slicing, but that is - // not important because we are not likely to use it. - System.arraycopy(currentIndices, offset, slicedIndices, 0, length); - return new CometSelectionVector(values, slicedIndices, length); - } - - @Override - public org.apache.arrow.vector.ValueVector getValueVector() { - return values.getValueVector(); - } - - @Override - public void close() { - // Close both the values and indices vectors - values.close(); - indices.close(); - } -} diff --git a/spark/src/main/java/org/apache/comet/vector/CometStructVector.java b/spark/src/main/java/org/apache/comet/vector/CometStructVector.java index 0e5bc3248a..259793b831 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometStructVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometStructVector.java @@ -28,7 +28,11 @@ import org.apache.arrow.vector.util.TransferPair; import org.apache.spark.sql.vectorized.ColumnVector; -/** A Comet column vector for struct type. */ +/** + * A {@link CometDecodedVector} for Spark struct columns, wrapping an Arrow {@link StructVector} and + * recursively wrapping each child as a {@link CometVector} so {@link #getChild(int)} returns the + * right concrete subtype. + */ public class CometStructVector extends CometDecodedVector { final List children; final DictionaryProvider dictionaryProvider; diff --git a/spark/src/main/java/org/apache/comet/vector/CometVector.java b/spark/src/main/java/org/apache/comet/vector/CometVector.java index 24696dbacb..96c7387999 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometVector.java @@ -38,7 +38,24 @@ import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.UTF8String; -/** Base class for all Comet column vector implementations. */ +/** + * Base class for all Comet column vector implementations. + * + *

Comet wraps Arrow vectors imported from native execution (via the Arrow C Data Interface) in + * this hierarchy so they satisfy Spark's {@link ColumnVector} contract. A custom hierarchy is + * required because: + * + *

+ */ public abstract class CometVector extends ColumnVector { private static final int DECIMAL_BYTE_WIDTH = 16; private final byte[] DECIMAL_BYTES = new byte[DECIMAL_BYTE_WIDTH]; @@ -60,18 +77,6 @@ public CometVector(DataType type) { super(type); } - /** - * Sets the number of nulls in this vector to be 'numNulls'. This is used when the vector is - * reused across batches. - */ - public abstract void setNumNulls(int numNulls); - - /** - * Sets the number of values (including both nulls and non-nulls) in this vector to be - * 'numValues'. This is used when the vector is reused across batches. - */ - public abstract void setNumValues(int numValues); - /** Returns the number of values in this vector. */ public abstract int numValues(); diff --git a/spark/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/spark/src/main/scala/org/apache/comet/vector/NativeUtil.scala index cfb9512e80..4f027cd9e7 100644 --- a/spark/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/spark/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -116,30 +116,6 @@ class NativeUtil { (0 until batch.numCols()).foreach { index => batch.column(index) match { - case selectionVector: CometSelectionVector => - // For CometSelectionVector, export only the values vector - val valuesVector = selectionVector.getValues - val valueVector = valuesVector.getValueVector - - // Use the selection vector's logical row count - numRows += selectionVector.numValues() - - val provider = if (valueVector.getField.getDictionary != null) { - valuesVector.getDictionaryProvider - } else { - null - } - - // The array and schema structures are allocated by native side. - // Don't need to deallocate them here. - val arrowSchema = ArrowSchema.wrap(schemaAddrs(index)) - val arrowArray = ArrowArray.wrap(arrayAddrs(index)) - Data.exportVector( - allocator, - getFieldVector(valueVector, "export"), - provider, - arrowArray, - arrowSchema) case a: CometVector => val valueVector = a.getValueVector @@ -173,44 +149,9 @@ class NativeUtil { s"Number of rows in each column should be the same, but got [${numRows.distinct}]") } - // `ColumnarBatch.numRows` might return a different number than the actual number of rows in - // the Arrow arrays. For example, Iceberg column reader will skip deleted rows internally in - // its `CometVector` implementation. The `ColumnarBatch` returned by the reader will report - // logical number of rows which is less than actual number of rows due to row deletion. - // Similarly, CometSelectionVector represents a different number of logical rows than the - // underlying vector. numRows.headOption.getOrElse(batch.numRows()) } - /** - * Exports a single CometVector to native side. - * - * @param vector - * The CometVector to export - * @param arrayAddr - * The address of the ArrowArray structure - * @param schemaAddr - * The address of the ArrowSchema structure - */ - def exportSingleVector(vector: CometVector, arrayAddr: Long, schemaAddr: Long): Unit = { - val valueVector = vector.getValueVector - - val provider = if (valueVector.getField.getDictionary != null) { - vector.getDictionaryProvider - } else { - null - } - - val arrowSchema = ArrowSchema.wrap(schemaAddr) - val arrowArray = ArrowArray.wrap(arrayAddr) - Data.exportVector( - allocator, - getFieldVector(valueVector, "export"), - provider, - arrowArray, - arrowSchema) - } - /** * Gets the next batch from native execution. * diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala index d965a6ff7b..2fe870ed06 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala @@ -45,8 +45,6 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.util.{SparkFatalException, Utils} import org.apache.spark.util.io.ChunkedByteBuffer -import org.apache.comet.vector.CometPlainVector - /** * Copied from Spark `ColumnarToRowExec`. Comet needs the fix for SPARK-50235 but cannot wait for * the fix to be released in Spark versions. We copy the implementation here to apply the fix. @@ -263,7 +261,6 @@ case class CometColumnarToRowExec(child: SparkPlan) val writableColumnVectorClz = classOf[WritableColumnVector].getName val constantColumnVectorClz = classOf[ConstantColumnVector].getName - val cometPlainColumnVectorClz = classOf[CometPlainVector].getName // scalastyle:off line.size.limit s""" @@ -280,15 +277,11 @@ case class CometColumnarToRowExec(child: SparkPlan) | } | $idx = $numRows; | - | // Comet fix for SPARK-50235 + | // Comet fix for SPARK-50235: close per-batch vectors. WritableColumnVector and + | // ConstantColumnVector are managed by Spark and must not be closed here. | for (int i = 0; i < ${colVars.length}; i++) { - | if (!($batch.column(i) instanceof $writableColumnVectorClz || $batch.column(i) instanceof $constantColumnVectorClz || $batch.column(i) instanceof $cometPlainColumnVectorClz)) { + | if (!($batch.column(i) instanceof $writableColumnVectorClz || $batch.column(i) instanceof $constantColumnVectorClz)) { | $batch.column(i).close(); - | } else if ($batch.column(i) instanceof $cometPlainColumnVectorClz) { - | $cometPlainColumnVectorClz cometPlainColumnVector = ($cometPlainColumnVectorClz) $batch.column(i); - | if (!cometPlainColumnVector.isReused()) { - | cometPlainColumnVector.close(); - | } | } | } |