-
Notifications
You must be signed in to change notification settings - Fork 291
perf: Improve performance of native row-to-columnar transition used by JVM shuffle #3289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
andygrove
wants to merge
33
commits into
apache:main
Choose a base branch
from
andygrove:shuffle-complex-type-perf
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
33 commits
Select commit
Hold shift + click to select a range
e32dd52
perf: optimize shuffle array element iteration with slice-based append
andygrove 2ea5631
chore: format code
andygrove a224e22
refactor: remove unnecessary #[inline] from large functions
andygrove fe54548
fix: address clippy warnings in benchmark
andygrove 471fb2a
perf: optimize struct field processing with field-major order
andygrove bbe2da7
test: add benchmark for struct column processing
andygrove f8fe7ba
fix: remove unused import and mut in benchmark
andygrove b87de09
merge
andygrove f3da0dc
perf: extend field-major processing to nested struct fields
andygrove 3e8f5b5
perf: batch processing for List and Map columns
andygrove 11a7bca
test: add benchmark for map column processing
andygrove 9b91cfd
refactor: rename struct_conversion benchmark to complex_type_conversion
andygrove 05d72a4
refactor: consolidate and rename shuffle benchmarks
andygrove 0deac66
lint
andygrove a6123c0
refactor: add safety comments and reduce boilerplate in shuffle code
andygrove 245c647
refactor: improve error handling for builder downcasts
andygrove ac8c182
lint
andygrove 83d1d30
refactor: add safety comments to remaining unsafe blocks
andygrove 11fe509
refactor: introduce read_row_at! macro to reduce boilerplate
andygrove ca702ee
refactor: replace unwrap with expect for builder downcasts
andygrove 970e30a
smaller difff
andygrove 1b76054
fix: resolve clippy errors in jni_api.rs
andygrove 738c46e
Merge remote-tracking branch 'apache/main' into shuffle-complex-type-…
andygrove 4c5eb0b
revert: restore row_columnar.rs benchmark and remove jvm_shuffle.rs
andygrove 8bdefb7
upmerge
andygrove 6c48c84
Merge branch 'main' into shuffle-complex-type-perf
andygrove a2087dd
fix: fix benchmark import path for SparkUnsafeArray list module
andygrove d46aec5
Merge branch 'main' into shuffle-complex-type-perf
andygrove cd91a43
Merge remote-tracking branch 'apache/main' into shuffle-complex-type-…
andygrove b6658da
upmerge
andygrove 9fe6b90
Merge branch 'main' into shuffle-complex-type-perf
andygrove 16f2948
Merge remote-tracking branch 'apache/main' into shuffle-complex-type-…
andygrove 314a36e
Merge branch 'main' into shuffle-complex-type-perf
mbutrovich File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,272 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! Micro-benchmarks for SparkUnsafeArray element iteration. | ||
| //! | ||
| //! This tests the low-level `append_to_builder` function which converts | ||
| //! SparkUnsafeArray elements to Arrow array builders. This is the inner loop | ||
| //! used when processing List/Array columns in JVM shuffle. | ||
|
|
||
| use arrow::array::builder::{ | ||
| Date32Builder, Float64Builder, Int32Builder, Int64Builder, TimestampMicrosecondBuilder, | ||
| }; | ||
| use arrow::datatypes::{DataType, TimeUnit}; | ||
| use comet::execution::shuffle::spark_unsafe::list::{append_to_builder, SparkUnsafeArray}; | ||
| use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; | ||
|
|
||
| const NUM_ELEMENTS: usize = 10000; | ||
|
|
||
| /// Create a SparkUnsafeArray in memory with i32 elements. | ||
| /// Layout: | ||
| /// - 8 bytes: num_elements (i64) | ||
| /// - null bitset: 8 bytes per 64 elements | ||
| /// - element data: 4 bytes per element (i32) | ||
| fn create_spark_unsafe_array_i32(num_elements: usize, with_nulls: bool) -> Vec<u8> { | ||
| // Header size: 8 (num_elements) + ceil(num_elements/64) * 8 (null bitset) | ||
| let null_bitset_words = num_elements.div_ceil(64); | ||
| let header_size = 8 + null_bitset_words * 8; | ||
| let data_size = num_elements * 4; // i32 = 4 bytes | ||
| let total_size = header_size + data_size; | ||
|
|
||
| let mut buffer = vec![0u8; total_size]; | ||
|
|
||
| // Write num_elements | ||
| buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes()); | ||
|
|
||
| // Write null bitset (set every 10th element as null if with_nulls) | ||
| if with_nulls { | ||
| for i in (0..num_elements).step_by(10) { | ||
| let word_idx = i / 64; | ||
| let bit_idx = i % 64; | ||
| let word_offset = 8 + word_idx * 8; | ||
| let current_word = | ||
| i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap()); | ||
| let new_word = current_word | (1i64 << bit_idx); | ||
| buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes()); | ||
| } | ||
| } | ||
|
|
||
| // Write element data | ||
| for i in 0..num_elements { | ||
| let offset = header_size + i * 4; | ||
| buffer[offset..offset + 4].copy_from_slice(&(i as i32).to_le_bytes()); | ||
| } | ||
|
|
||
| buffer | ||
| } | ||
|
|
||
| /// Create a SparkUnsafeArray in memory with i64 elements. | ||
| fn create_spark_unsafe_array_i64(num_elements: usize, with_nulls: bool) -> Vec<u8> { | ||
| let null_bitset_words = num_elements.div_ceil(64); | ||
| let header_size = 8 + null_bitset_words * 8; | ||
| let data_size = num_elements * 8; // i64 = 8 bytes | ||
| let total_size = header_size + data_size; | ||
|
|
||
| let mut buffer = vec![0u8; total_size]; | ||
|
|
||
| // Write num_elements | ||
| buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes()); | ||
|
|
||
| // Write null bitset | ||
| if with_nulls { | ||
| for i in (0..num_elements).step_by(10) { | ||
| let word_idx = i / 64; | ||
| let bit_idx = i % 64; | ||
| let word_offset = 8 + word_idx * 8; | ||
| let current_word = | ||
| i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap()); | ||
| let new_word = current_word | (1i64 << bit_idx); | ||
| buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes()); | ||
| } | ||
| } | ||
|
|
||
| // Write element data | ||
| for i in 0..num_elements { | ||
| let offset = header_size + i * 8; | ||
| buffer[offset..offset + 8].copy_from_slice(&(i as i64).to_le_bytes()); | ||
| } | ||
|
|
||
| buffer | ||
| } | ||
|
|
||
| /// Create a SparkUnsafeArray in memory with f64 elements. | ||
| fn create_spark_unsafe_array_f64(num_elements: usize, with_nulls: bool) -> Vec<u8> { | ||
| let null_bitset_words = num_elements.div_ceil(64); | ||
| let header_size = 8 + null_bitset_words * 8; | ||
| let data_size = num_elements * 8; // f64 = 8 bytes | ||
| let total_size = header_size + data_size; | ||
|
|
||
| let mut buffer = vec![0u8; total_size]; | ||
|
|
||
| // Write num_elements | ||
| buffer[0..8].copy_from_slice(&(num_elements as i64).to_le_bytes()); | ||
|
|
||
| // Write null bitset | ||
| if with_nulls { | ||
| for i in (0..num_elements).step_by(10) { | ||
| let word_idx = i / 64; | ||
| let bit_idx = i % 64; | ||
| let word_offset = 8 + word_idx * 8; | ||
| let current_word = | ||
| i64::from_le_bytes(buffer[word_offset..word_offset + 8].try_into().unwrap()); | ||
| let new_word = current_word | (1i64 << bit_idx); | ||
| buffer[word_offset..word_offset + 8].copy_from_slice(&new_word.to_le_bytes()); | ||
| } | ||
| } | ||
|
|
||
| // Write element data | ||
| for i in 0..num_elements { | ||
| let offset = header_size + i * 8; | ||
| buffer[offset..offset + 8].copy_from_slice(&(i as f64).to_le_bytes()); | ||
| } | ||
|
|
||
| buffer | ||
| } | ||
|
|
||
| fn benchmark_array_conversion(c: &mut Criterion) { | ||
| let mut group = c.benchmark_group("spark_unsafe_array_to_arrow"); | ||
|
|
||
| // Benchmark i32 array conversion | ||
| for with_nulls in [false, true] { | ||
| let buffer = create_spark_unsafe_array_i32(NUM_ELEMENTS, with_nulls); | ||
| let array = SparkUnsafeArray::new(buffer.as_ptr() as i64); | ||
| let null_str = if with_nulls { "with_nulls" } else { "no_nulls" }; | ||
|
|
||
| group.bench_with_input( | ||
| BenchmarkId::new("i32", null_str), | ||
| &(&array, &buffer), | ||
| |b, (array, _buffer)| { | ||
| b.iter(|| { | ||
| let mut builder = Int32Builder::with_capacity(NUM_ELEMENTS); | ||
| if with_nulls { | ||
| append_to_builder::<true>(&DataType::Int32, &mut builder, array).unwrap(); | ||
| } else { | ||
| append_to_builder::<false>(&DataType::Int32, &mut builder, array).unwrap(); | ||
| } | ||
| builder.finish() | ||
| }); | ||
| }, | ||
| ); | ||
| } | ||
|
|
||
| // Benchmark i64 array conversion | ||
| for with_nulls in [false, true] { | ||
| let buffer = create_spark_unsafe_array_i64(NUM_ELEMENTS, with_nulls); | ||
| let array = SparkUnsafeArray::new(buffer.as_ptr() as i64); | ||
| let null_str = if with_nulls { "with_nulls" } else { "no_nulls" }; | ||
|
|
||
| group.bench_with_input( | ||
| BenchmarkId::new("i64", null_str), | ||
| &(&array, &buffer), | ||
| |b, (array, _buffer)| { | ||
| b.iter(|| { | ||
| let mut builder = Int64Builder::with_capacity(NUM_ELEMENTS); | ||
| if with_nulls { | ||
| append_to_builder::<true>(&DataType::Int64, &mut builder, array).unwrap(); | ||
| } else { | ||
| append_to_builder::<false>(&DataType::Int64, &mut builder, array).unwrap(); | ||
| } | ||
| builder.finish() | ||
| }); | ||
| }, | ||
| ); | ||
| } | ||
|
|
||
| // Benchmark f64 array conversion | ||
| for with_nulls in [false, true] { | ||
| let buffer = create_spark_unsafe_array_f64(NUM_ELEMENTS, with_nulls); | ||
| let array = SparkUnsafeArray::new(buffer.as_ptr() as i64); | ||
| let null_str = if with_nulls { "with_nulls" } else { "no_nulls" }; | ||
|
|
||
| group.bench_with_input( | ||
| BenchmarkId::new("f64", null_str), | ||
| &(&array, &buffer), | ||
| |b, (array, _buffer)| { | ||
| b.iter(|| { | ||
| let mut builder = Float64Builder::with_capacity(NUM_ELEMENTS); | ||
| if with_nulls { | ||
| append_to_builder::<true>(&DataType::Float64, &mut builder, array).unwrap(); | ||
| } else { | ||
| append_to_builder::<false>(&DataType::Float64, &mut builder, array) | ||
| .unwrap(); | ||
| } | ||
| builder.finish() | ||
| }); | ||
| }, | ||
| ); | ||
| } | ||
|
|
||
| // Benchmark date32 array conversion (same memory layout as i32) | ||
| for with_nulls in [false, true] { | ||
| let buffer = create_spark_unsafe_array_i32(NUM_ELEMENTS, with_nulls); | ||
| let array = SparkUnsafeArray::new(buffer.as_ptr() as i64); | ||
| let null_str = if with_nulls { "with_nulls" } else { "no_nulls" }; | ||
|
|
||
| group.bench_with_input( | ||
| BenchmarkId::new("date32", null_str), | ||
| &(&array, &buffer), | ||
| |b, (array, _buffer)| { | ||
| b.iter(|| { | ||
| let mut builder = Date32Builder::with_capacity(NUM_ELEMENTS); | ||
| if with_nulls { | ||
| append_to_builder::<true>(&DataType::Date32, &mut builder, array).unwrap(); | ||
| } else { | ||
| append_to_builder::<false>(&DataType::Date32, &mut builder, array).unwrap(); | ||
| } | ||
| builder.finish() | ||
| }); | ||
| }, | ||
| ); | ||
| } | ||
|
|
||
| // Benchmark timestamp array conversion (same memory layout as i64) | ||
| for with_nulls in [false, true] { | ||
| let buffer = create_spark_unsafe_array_i64(NUM_ELEMENTS, with_nulls); | ||
| let array = SparkUnsafeArray::new(buffer.as_ptr() as i64); | ||
| let null_str = if with_nulls { "with_nulls" } else { "no_nulls" }; | ||
|
|
||
| group.bench_with_input( | ||
| BenchmarkId::new("timestamp", null_str), | ||
| &(&array, &buffer), | ||
| |b, (array, _buffer)| { | ||
| b.iter(|| { | ||
| let mut builder = TimestampMicrosecondBuilder::with_capacity(NUM_ELEMENTS); | ||
| let dt = DataType::Timestamp(TimeUnit::Microsecond, None); | ||
| if with_nulls { | ||
| append_to_builder::<true>(&dt, &mut builder, array).unwrap(); | ||
| } else { | ||
| append_to_builder::<false>(&dt, &mut builder, array).unwrap(); | ||
| } | ||
| builder.finish() | ||
| }); | ||
| }, | ||
| ); | ||
| } | ||
|
|
||
| group.finish(); | ||
| } | ||
|
|
||
| fn config() -> Criterion { | ||
| Criterion::default() | ||
| } | ||
|
|
||
| criterion_group! { | ||
| name = benches; | ||
| config = config(); | ||
| targets = benchmark_array_conversion | ||
| } | ||
| criterion_main!(benches); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has been a month since I made this change, so I do not fully remember, but maybe just removing an unnecessary clone.