Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e32dd52
perf: optimize shuffle array element iteration with slice-based append
andygrove Jan 20, 2026
2ea5631
chore: format code
andygrove Jan 20, 2026
a224e22
refactor: remove unnecessary #[inline] from large functions
andygrove Jan 20, 2026
fe54548
fix: address clippy warnings in benchmark
andygrove Jan 20, 2026
471fb2a
perf: optimize struct field processing with field-major order
andygrove Jan 20, 2026
bbe2da7
test: add benchmark for struct column processing
andygrove Jan 20, 2026
f8fe7ba
fix: remove unused import and mut in benchmark
andygrove Jan 20, 2026
b87de09
merge
andygrove Jan 26, 2026
f3da0dc
perf: extend field-major processing to nested struct fields
andygrove Jan 26, 2026
3e8f5b5
perf: batch processing for List and Map columns
andygrove Jan 26, 2026
11a7bca
test: add benchmark for map column processing
andygrove Jan 26, 2026
9b91cfd
refactor: rename struct_conversion benchmark to complex_type_conversion
andygrove Jan 26, 2026
05d72a4
refactor: consolidate and rename shuffle benchmarks
andygrove Jan 26, 2026
0deac66
lint
andygrove Jan 26, 2026
a6123c0
refactor: add safety comments and reduce boilerplate in shuffle code
andygrove Jan 26, 2026
245c647
refactor: improve error handling for builder downcasts
andygrove Jan 26, 2026
ac8c182
lint
andygrove Jan 26, 2026
83d1d30
refactor: add safety comments to remaining unsafe blocks
andygrove Jan 26, 2026
11fe509
refactor: introduce read_row_at! macro to reduce boilerplate
andygrove Jan 26, 2026
ca702ee
refactor: replace unwrap with expect for builder downcasts
andygrove Jan 26, 2026
970e30a
smaller difff
andygrove Jan 26, 2026
1b76054
fix: resolve clippy errors in jni_api.rs
andygrove Jan 26, 2026
738c46e
Merge remote-tracking branch 'apache/main' into shuffle-complex-type-…
andygrove Jan 26, 2026
4c5eb0b
revert: restore row_columnar.rs benchmark and remove jvm_shuffle.rs
andygrove Jan 26, 2026
8bdefb7
upmerge
andygrove Feb 11, 2026
6c48c84
Merge branch 'main' into shuffle-complex-type-perf
andygrove Feb 13, 2026
a2087dd
fix: fix benchmark import path for SparkUnsafeArray list module
andygrove Feb 19, 2026
d46aec5
Merge branch 'main' into shuffle-complex-type-perf
andygrove Feb 19, 2026
cd91a43
Merge remote-tracking branch 'apache/main' into shuffle-complex-type-…
andygrove Feb 24, 2026
b6658da
upmerge
andygrove Feb 26, 2026
9fe6b90
Merge branch 'main' into shuffle-complex-type-perf
andygrove Feb 26, 2026
16f2948
Merge remote-tracking branch 'apache/main' into shuffle-complex-type-…
andygrove Mar 3, 2026
314a36e
Merge branch 'main' into shuffle-complex-type-perf
mbutrovich Mar 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,7 @@ harness = false
[[bench]]
name = "parquet_decode"
harness = false

[[bench]]
name = "array_element_append"
harness = false
272 changes: 272 additions & 0 deletions native/core/benches/array_element_append.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Micro-benchmarks for SparkUnsafeArray element iteration.
//!
//! This tests the low-level `append_to_builder` function which converts
//! SparkUnsafeArray elements to Arrow array builders. This is the inner loop
//! used when processing List/Array columns in JVM shuffle.

use arrow::array::builder::{
Date32Builder, Float64Builder, Int32Builder, Int64Builder, TimestampMicrosecondBuilder,
};
use arrow::datatypes::{DataType, TimeUnit};
use comet::execution::shuffle::spark_unsafe::list::{append_to_builder, SparkUnsafeArray};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};

const NUM_ELEMENTS: usize = 10000;

/// Create a SparkUnsafeArray in memory with i32 elements.
/// Layout:
/// - 8 bytes: num_elements (i64)
/// - null bitset: 8 bytes per 64 elements
/// - element data: 4 bytes per element (i32)
fn create_spark_unsafe_array_i32(num_elements: usize, with_nulls: bool) -> Vec<u8> {
// Header size: 8 (num_elements) + ceil(num_elements/64) * 8 (null bitset)
let null_bitset_words = num_elements.div_ceil(64);
let header_size = 8 + null_bitset_words * 8;
let data_size = num_elements * 4; // i32 = 4 bytes
let total_size = header_size + data_size;

let mut buffer = vec![0u8; total_size];

// Write num_elements
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());

// Write null bitset (set every 10th element as null if with_nulls)
if with_nulls {
for i in (0..num_elements).step_by(10) {
let word_idx = i / 64;
let bit_idx = i % 64;
let word_offset = 8 + word_idx * 8;
let current_word =
i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
let new_word = current_word | (1i64 << bit_idx);
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
}
}

// Write element data
for i in 0..num_elements {
let offset = header_size + i * 4;
buffer[offset..offset + 4].copy_from_slice(&(i as i32).to_le_bytes());
}

buffer
}

/// Create a SparkUnsafeArray in memory with i64 elements.
fn create_spark_unsafe_array_i64(num_elements: usize, with_nulls: bool) -> Vec<u8> {
let null_bitset_words = num_elements.div_ceil(64);
let header_size = 8 + null_bitset_words * 8;
let data_size = num_elements * 8; // i64 = 8 bytes
let total_size = header_size + data_size;

let mut buffer = vec![0u8; total_size];

// Write num_elements
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());

// Write null bitset
if with_nulls {
for i in (0..num_elements).step_by(10) {
let word_idx = i / 64;
let bit_idx = i % 64;
let word_offset = 8 + word_idx * 8;
let current_word =
i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
let new_word = current_word | (1i64 << bit_idx);
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
}
}

// Write element data
for i in 0..num_elements {
let offset = header_size + i * 8;
buffer[offset..offset + 8].copy_from_slice(&(i as i64).to_le_bytes());
}

buffer
}

/// Create a SparkUnsafeArray in memory with f64 elements.
fn create_spark_unsafe_array_f64(num_elements: usize, with_nulls: bool) -> Vec<u8> {
let null_bitset_words = num_elements.div_ceil(64);
let header_size = 8 + null_bitset_words * 8;
let data_size = num_elements * 8; // f64 = 8 bytes
let total_size = header_size + data_size;

let mut buffer = vec![0u8; total_size];

// Write num_elements
buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes());

// Write null bitset
if with_nulls {
for i in (0..num_elements).step_by(10) {
let word_idx = i / 64;
let bit_idx = i % 64;
let word_offset = 8 + word_idx * 8;
let current_word =
i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap());
let new_word = current_word | (1i64 << bit_idx);
buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes());
}
}

// Write element data
for i in 0..num_elements {
let offset = header_size + i * 8;
buffer[offset..offset + 8].copy_from_slice(&(i as f64).to_le_bytes());
}

buffer
}

fn benchmark_array_conversion(c: &mut Criterion) {
let mut group = c.benchmark_group("spark_unsafe_array_to_arrow");

// Benchmark i32 array conversion
for with_nulls in [false, true] {
let buffer = create_spark_unsafe_array_i32(NUM_ELEMENTS, with_nulls);
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };

group.bench_with_input(
BenchmarkId::new("i32", null_str),
&(&array, &buffer),
|b, (array, _buffer)| {
b.iter(|| {
let mut builder = Int32Builder::with_capacity(NUM_ELEMENTS);
if with_nulls {
append_to_builder::<true>(&DataType::Int32, &mut builder, array).unwrap();
} else {
append_to_builder::<false>(&DataType::Int32, &mut builder, array).unwrap();
}
builder.finish()
});
},
);
}

// Benchmark i64 array conversion
for with_nulls in [false, true] {
let buffer = create_spark_unsafe_array_i64(NUM_ELEMENTS, with_nulls);
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };

group.bench_with_input(
BenchmarkId::new("i64", null_str),
&(&array, &buffer),
|b, (array, _buffer)| {
b.iter(|| {
let mut builder = Int64Builder::with_capacity(NUM_ELEMENTS);
if with_nulls {
append_to_builder::<true>(&DataType::Int64, &mut builder, array).unwrap();
} else {
append_to_builder::<false>(&DataType::Int64, &mut builder, array).unwrap();
}
builder.finish()
});
},
);
}

// Benchmark f64 array conversion
for with_nulls in [false, true] {
let buffer = create_spark_unsafe_array_f64(NUM_ELEMENTS, with_nulls);
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };

group.bench_with_input(
BenchmarkId::new("f64", null_str),
&(&array, &buffer),
|b, (array, _buffer)| {
b.iter(|| {
let mut builder = Float64Builder::with_capacity(NUM_ELEMENTS);
if with_nulls {
append_to_builder::<true>(&DataType::Float64, &mut builder, array).unwrap();
} else {
append_to_builder::<false>(&DataType::Float64, &mut builder, array)
.unwrap();
}
builder.finish()
});
},
);
}

// Benchmark date32 array conversion (same memory layout as i32)
for with_nulls in [false, true] {
let buffer = create_spark_unsafe_array_i32(NUM_ELEMENTS, with_nulls);
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };

group.bench_with_input(
BenchmarkId::new("date32", null_str),
&(&array, &buffer),
|b, (array, _buffer)| {
b.iter(|| {
let mut builder = Date32Builder::with_capacity(NUM_ELEMENTS);
if with_nulls {
append_to_builder::<true>(&DataType::Date32, &mut builder, array).unwrap();
} else {
append_to_builder::<false>(&DataType::Date32, &mut builder, array).unwrap();
}
builder.finish()
});
},
);
}

// Benchmark timestamp array conversion (same memory layout as i64)
for with_nulls in [false, true] {
let buffer = create_spark_unsafe_array_i64(NUM_ELEMENTS, with_nulls);
let array = SparkUnsafeArray::new(buffer.as_ptr() as i64);
let null_str = if with_nulls { "with_nulls" } else { "no_nulls" };

group.bench_with_input(
BenchmarkId::new("timestamp", null_str),
&(&array, &buffer),
|b, (array, _buffer)| {
b.iter(|| {
let mut builder = TimestampMicrosecondBuilder::with_capacity(NUM_ELEMENTS);
let dt = DataType::Timestamp(TimeUnit::Microsecond, None);
if with_nulls {
append_to_builder::<true>(&dt, &mut builder, array).unwrap();
} else {
append_to_builder::<false>(&dt, &mut builder, array).unwrap();
}
builder.finish()
});
},
);
}

group.finish();
}

fn config() -> Criterion {
Criterion::default()
}

criterion_group! {
name = benches;
config = config();
targets = benchmark_array_conversion
}
criterion_main!(benches);
2 changes: 1 addition & 1 deletion native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
let physical_plan_time = start.elapsed();

exec_context.plan_creation_time += physical_plan_time;
exec_context.root_op = Some(Arc::clone(&root_op));
exec_context.scans = scans;

if exec_context.explain_native {
Expand Down Expand Up @@ -577,6 +576,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
} else {
exec_context.stream = Some(stream);
}
exec_context.root_op = Some(root_op);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this changed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been a month since I made this change, so I do not fully remember, but maybe just removing an unnecessary clone.

} else {
// Pull input batches
pull_input_batches(exec_context)?;
Expand Down
Loading
Loading