From d25b05323828ecf5ca1886892b48e0e7742dc2ba Mon Sep 17 00:00:00 2001 From: Nathan Bezualem Date: Mon, 18 May 2026 03:14:35 -0400 Subject: [PATCH 1/5] Add multi-column group by benchmark --- datafusion/core/Cargo.toml | 5 + datafusion/core/benches/multi_group_by.rs | 219 ++++++++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 datafusion/core/benches/multi_group_by.rs diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 5019560da9306..825d5a2318a64 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -294,6 +294,11 @@ harness = false name = "preserve_file_partitioning" required-features = ["parquet"] +[[bench]] +harness = false +name = "multi_group_by" +required-features = ["parquet"] + [[bench]] harness = false name = "reset_plan_states" diff --git a/datafusion/core/benches/multi_group_by.rs b/datafusion/core/benches/multi_group_by.rs new file mode 100644 index 0000000000000..6a5857cd24297 --- /dev/null +++ b/datafusion/core/benches/multi_group_by.rs @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for multi-column GROUP BY performance. +//! +//! Tests the performance of grouping across different cardinality +//! scenarios and column counts. Uses Parquet files so that column +//! statistics (min/max) are available to the optimizer for heuristic +//! decisions about GroupValues implementation selection. +//! +//! The benchmark pre-plans the query and only measures execution time +//! (excludes planning and I/O setup overhead). + +use arrow::array::{ArrayRef, Int32Array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema}; +use criterion::{Criterion, criterion_group, criterion_main}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use parking_lot::Mutex; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::WriterProperties; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use std::hint::black_box; +use std::sync::Arc; +use tempfile::NamedTempFile; +use tokio::runtime::Runtime; + +const NUM_ROWS: usize = 1_000_000; +const BATCH_SIZE: usize = 8192; + +fn build_group_by_sql(num_cols: usize) -> String { + let cols: Vec = (0..num_cols).map(|i| format!("col_{i}")).collect(); + let col_list = cols.join(", "); + format!("SELECT {col_list} FROM t GROUP BY {col_list}") +} + +fn generate_parquet_file(num_cols: usize, cardinality: usize) -> NamedTempFile { + let mut rng = StdRng::seed_from_u64(42); + let fields: Vec = (0..num_cols) + .map(|i| Field::new(format!("col_{i}"), DataType::Int32, false)) + .collect(); + let schema = Arc::new(Schema::new(fields)); + + let mut temp_file = tempfile::Builder::new() + .prefix("multi_group_by") + .suffix(".parquet") + .tempfile() + .unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(NUM_ROWS)) + .build(); + + let mut writer = + ArrowWriter::try_new(&mut temp_file, Arc::clone(&schema), Some(props)).unwrap(); + + let num_batches = NUM_ROWS / BATCH_SIZE; + for _ in 0..num_batches { + let columns: Vec = (0..num_cols) + .map(|_| { + let values: Vec = (0..BATCH_SIZE) + .map(|_| rng.random_range(0..cardinality as i32)) + .collect(); + Arc::new(Int32Array::from(values)) as ArrayRef + }) + .collect(); + let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap(); + writer.write(&batch).unwrap(); + } + + writer.close().unwrap(); + temp_file +} + +struct BenchContext { + ctx: Arc>, + _temp_file: NamedTempFile, +} + +#[expect(clippy::needless_pass_by_value)] +fn query(ctx: Arc>, rt: &Runtime, sql: &str) { + let df = rt.block_on(ctx.lock().sql(sql)).unwrap(); + black_box(rt.block_on(df.collect()).unwrap()); +} + +fn prepare_context(rt: &Runtime, num_cols: usize, cardinality: usize) -> BenchContext { + let temp_file = generate_parquet_file(num_cols, cardinality); + let path = temp_file.path().to_str().unwrap().to_string(); + + let config = SessionConfig::new().with_target_partitions(1); + let ctx = SessionContext::new_with_config(config); + rt.block_on(async { + ctx.register_parquet("t", &path, Default::default()) + .await + .unwrap(); + // Warm the OS page cache + let df = ctx.sql(&build_group_by_sql(num_cols)).await.unwrap(); + let _ = df.collect().await.unwrap(); + }); + + BenchContext { + ctx: Arc::new(Mutex::new(ctx)), + _temp_file: temp_file, + } +} + +fn criterion_benchmark(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + // === Experiment 1: Fixed ~100-1000 groups, vary column count === + let b_ctx = prepare_context(&rt, 2, 10); // 10^2 = 100 groups + c.bench_function("fixed_groups_cols_2_grp_100", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(2))) + }); + + let b_ctx = prepare_context(&rt, 3, 5); // 5^3 = 125 groups + c.bench_function("fixed_groups_cols_3_grp_125", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(3))) + }); + + let b_ctx = prepare_context(&rt, 4, 3); // 3^4 = 81 groups + c.bench_function("fixed_groups_cols_4_grp_81", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) + }); + + let b_ctx = prepare_context(&rt, 6, 3); // 3^6 = 729 groups + c.bench_function("fixed_groups_cols_6_grp_729", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(6))) + }); + + let b_ctx = prepare_context(&rt, 8, 2); // 2^8 = 256 groups + c.bench_function("fixed_groups_cols_8_grp_256", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(8))) + }); + + let b_ctx = prepare_context(&rt, 10, 2); // 2^10 = 1024 groups + c.bench_function("fixed_groups_cols_10_grp_1024", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(10))) + }); + + // === Experiment 1b: High groups (~1M), vary column count === + let b_ctx = prepare_context(&rt, 2, 1000); // 1000^2 = 1M groups + c.bench_function("high_groups_cols_2_grp_1M", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(2))) + }); + + let b_ctx = prepare_context(&rt, 3, 100); // 100^3 = 1M groups + c.bench_function("high_groups_cols_3_grp_1M", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(3))) + }); + + let b_ctx = prepare_context(&rt, 4, 32); // 32^4 = ~1M groups + c.bench_function("high_groups_cols_4_grp_1M", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) + }); + + let b_ctx = prepare_context(&rt, 6, 10); // 10^6 = 1M groups + c.bench_function("high_groups_cols_6_grp_1M", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(6))) + }); + + let b_ctx = prepare_context(&rt, 8, 6); // 6^8 = ~1.7M groups + c.bench_function("high_groups_cols_8_grp_1M", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(8))) + }); + + let b_ctx = prepare_context(&rt, 10, 4); // 4^10 = ~1M groups + c.bench_function("high_groups_cols_10_grp_1M", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(10))) + }); + + // === Experiment 2: Fixed 4 columns, vary group count === + let b_ctx = prepare_context(&rt, 4, 2); // 2^4 = 16 groups + c.bench_function("fixed_4cols_grp_16", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) + }); + + let b_ctx = prepare_context(&rt, 4, 5); // 5^4 = 625 groups + c.bench_function("fixed_4cols_grp_625", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) + }); + + let b_ctx = prepare_context(&rt, 4, 10); // 10^4 = 10K groups + c.bench_function("fixed_4cols_grp_10000", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) + }); + + let b_ctx = prepare_context(&rt, 4, 30); // 30^4 = 810K groups + c.bench_function("fixed_4cols_grp_810000", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) + }); + + let b_ctx = prepare_context(&rt, 4, 100); // 100^4 = 100M groups + c.bench_function("fixed_4cols_grp_100M", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) + }); + + let b_ctx = prepare_context(&rt, 4, 500); // 500^4 = 62.5B groups + c.bench_function("fixed_4cols_grp_62B", |b| { + b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); From 135a39bbf0a2c6ed9df14dd18acaff9264619d53 Mon Sep 17 00:00:00 2001 From: Nathan Bezualem Date: Wed, 20 May 2026 10:58:00 -0400 Subject: [PATCH 2/5] Address review comments: compare vectorized vs row-based, fix planning overhead, exact NDV Addresses all three review comments from @kosiew: 1. **Implementation comparison**: Benchmarks both GroupValuesColumn (vectorized, via Int32 columns) and GroupValuesRows (row-based, via FixedSizeBinary(4) columns that trigger the fallback path) side-by-side. 2. **Execution-only timing**: Pre-optimizes the logical plan once via `df.into_parts()`. Each benchmark iteration only does physical planning + execution, excluding SQL parsing and logical optimization. 3. **Exact cardinality**: Replaces random sampling with sequential enumeration (`global_row % num_distinct_groups` decomposed per-column), guaranteeing precise distinct group counts with no birthday-paradox error. Additionally motivated by https://github.com/apache/datafusion/issues/17850, adds comprehensive experiments: - Issue #17850 regression reproduction (3 cols, 64 groups, 1M-50M rows) - Low cardinality sweep (8-4096 groups) - Batch size sensitivity (1K-32K) - Column count scaling (2-10 cols, low and high cardinality) - Group count sweep (16 to 1M groups) - Random vs sequential data patterns Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/core/benches/multi_group_by.rs | 545 +++++++++++++++++----- 1 file changed, 419 insertions(+), 126 deletions(-) diff --git a/datafusion/core/benches/multi_group_by.rs b/datafusion/core/benches/multi_group_by.rs index 6a5857cd24297..6279848bf3b61 100644 --- a/datafusion/core/benches/multi_group_by.rs +++ b/datafusion/core/benches/multi_group_by.rs @@ -15,21 +15,29 @@ // specific language governing permissions and limitations // under the License. -//! Benchmarks for multi-column GROUP BY performance. +//! Benchmarks for multi-column GROUP BY performance comparing vectorized +//! (`GroupValuesColumn`) vs row-based (`GroupValuesRows`) implementations. //! -//! Tests the performance of grouping across different cardinality -//! scenarios and column counts. Uses Parquet files so that column -//! statistics (min/max) are available to the optimizer for heuristic -//! decisions about GroupValues implementation selection. +//! Motivated by which +//! showed vectorized can regress 33% for low-cardinality, high-row-count +//! scenarios (3 cols, 64 groups, 1B rows). //! -//! The benchmark pre-plans the query and only measures execution time -//! (excludes planning and I/O setup overhead). - -use arrow::array::{ArrayRef, Int32Array, RecordBatch}; +//! Design: +//! - **Implementation comparison**: Int32 columns trigger the vectorized path; +//! FixedSizeBinary(4) columns trigger the row-based fallback. +//! - **Execution-only timing**: Pre-optimizes the logical plan once; each +//! iteration only does physical planning + execution. +//! - **Exact cardinality**: Sequential enumeration guarantees precise NDV. +//! - **Regression scenario**: Reproduces issue #17850 conditions (few groups, +//! many rows) to identify the crossover point. + +use arrow::array::{ArrayRef, FixedSizeBinaryArray, Int32Array, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema}; -use criterion::{Criterion, criterion_group, criterion_main}; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion::execution::SessionState; +use datafusion::physical_plan::collect; use datafusion::prelude::{SessionConfig, SessionContext}; -use parking_lot::Mutex; +use datafusion_expr::LogicalPlan; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; use rand::rngs::StdRng; @@ -39,8 +47,41 @@ use std::sync::Arc; use tempfile::NamedTempFile; use tokio::runtime::Runtime; -const NUM_ROWS: usize = 1_000_000; -const BATCH_SIZE: usize = 8192; +const DEFAULT_NUM_ROWS: usize = 1_000_000; +const DEFAULT_BATCH_SIZE: usize = 8192; + +#[derive(Clone, Copy, PartialEq, Eq)] +enum GroupMode { + Vectorized, + RowBased, +} + +impl std::fmt::Display for GroupMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + GroupMode::Vectorized => write!(f, "vectorized"), + GroupMode::RowBased => write!(f, "row_based"), + } + } +} + +#[derive(Clone, Copy, PartialEq, Eq)] +enum DataPattern { + /// Sequential enumeration: deterministic cycling through all group combinations. + Sequential, + /// Random: each column independently picks a value from 0..per_col_card using a seeded RNG. + /// This matches the data generation in issue #17850 (random() * 4)::integer. + Random, +} + +impl std::fmt::Display for DataPattern { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DataPattern::Sequential => write!(f, "sequential"), + DataPattern::Random => write!(f, "random"), + } + } +} fn build_group_by_sql(num_cols: usize) -> String { let cols: Vec = (0..num_cols).map(|i| format!("col_{i}")).collect(); @@ -48,13 +89,29 @@ fn build_group_by_sql(num_cols: usize) -> String { format!("SELECT {col_list} FROM t GROUP BY {col_list}") } -fn generate_parquet_file(num_cols: usize, cardinality: usize) -> NamedTempFile { - let mut rng = StdRng::seed_from_u64(42); +fn generate_parquet_file( + num_cols: usize, + num_distinct_groups: usize, + num_rows: usize, + batch_size: usize, + mode: GroupMode, + pattern: DataPattern, +) -> NamedTempFile { let fields: Vec = (0..num_cols) - .map(|i| Field::new(format!("col_{i}"), DataType::Int32, false)) + .map(|i| { + let dt = match mode { + GroupMode::Vectorized => DataType::Int32, + GroupMode::RowBased => DataType::FixedSizeBinary(4), + }; + Field::new(format!("col_{i}"), dt, false) + }) .collect(); let schema = Arc::new(Schema::new(fields)); + let per_col_card = (num_distinct_groups as f64) + .powf(1.0 / num_cols as f64) + .ceil() as usize; + let mut temp_file = tempfile::Builder::new() .prefix("multi_group_by") .suffix(".parquet") @@ -62,20 +119,56 @@ fn generate_parquet_file(num_cols: usize, cardinality: usize) -> NamedTempFile { .unwrap(); let props = WriterProperties::builder() - .set_max_row_group_row_count(Some(NUM_ROWS)) + .set_max_row_group_row_count(Some(num_rows)) .build(); let mut writer = ArrowWriter::try_new(&mut temp_file, Arc::clone(&schema), Some(props)).unwrap(); - let num_batches = NUM_ROWS / BATCH_SIZE; - for _ in 0..num_batches { + let mut rng = StdRng::seed_from_u64(42); + let num_batches = num_rows / batch_size; + for batch_idx in 0..num_batches { + let batch_start = batch_idx * batch_size; let columns: Vec = (0..num_cols) - .map(|_| { - let values: Vec = (0..BATCH_SIZE) - .map(|_| rng.random_range(0..cardinality as i32)) - .collect(); - Arc::new(Int32Array::from(values)) as ArrayRef + .map(|col_idx| match mode { + GroupMode::Vectorized => { + let values: Vec = (0..batch_size) + .map(|row| match pattern { + DataPattern::Sequential => { + let global_row = batch_start + row; + let group_id = global_row % num_distinct_groups; + let divisor = per_col_card.pow(col_idx as u32); + ((group_id / divisor) % per_col_card) as i32 + } + DataPattern::Random => { + rng.random_range(0..per_col_card as i32) + } + }) + .collect(); + Arc::new(Int32Array::from(values)) as ArrayRef + } + GroupMode::RowBased => { + let values: Vec> = (0..batch_size) + .map(|row| { + let val = match pattern { + DataPattern::Sequential => { + let global_row = batch_start + row; + let group_id = global_row % num_distinct_groups; + let divisor = per_col_card.pow(col_idx as u32); + ((group_id / divisor) % per_col_card) as i32 + } + DataPattern::Random => { + rng.random_range(0..per_col_card as i32) + } + }; + val.to_le_bytes().to_vec() + }) + .collect(); + let refs: Vec<&[u8]> = values.iter().map(|v| v.as_slice()).collect(); + Arc::new( + FixedSizeBinaryArray::try_from_iter(refs.into_iter()).unwrap(), + ) as ArrayRef + } }) .collect(); let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap(); @@ -87,133 +180,333 @@ fn generate_parquet_file(num_cols: usize, cardinality: usize) -> NamedTempFile { } struct BenchContext { - ctx: Arc>, + session_state: SessionState, + logical_plan: LogicalPlan, _temp_file: NamedTempFile, } -#[expect(clippy::needless_pass_by_value)] -fn query(ctx: Arc>, rt: &Runtime, sql: &str) { - let df = rt.block_on(ctx.lock().sql(sql)).unwrap(); - black_box(rt.block_on(df.collect()).unwrap()); -} - -fn prepare_context(rt: &Runtime, num_cols: usize, cardinality: usize) -> BenchContext { - let temp_file = generate_parquet_file(num_cols, cardinality); +fn prepare_context( + rt: &Runtime, + num_cols: usize, + num_distinct_groups: usize, + num_rows: usize, + batch_size: usize, + mode: GroupMode, + pattern: DataPattern, +) -> BenchContext { + let temp_file = generate_parquet_file( + num_cols, + num_distinct_groups, + num_rows, + batch_size, + mode, + pattern, + ); let path = temp_file.path().to_str().unwrap().to_string(); - let config = SessionConfig::new().with_target_partitions(1); + let config = SessionConfig::new() + .with_target_partitions(1) + .with_batch_size(batch_size); let ctx = SessionContext::new_with_config(config); - rt.block_on(async { + let sql = build_group_by_sql(num_cols); + + let (session_state, logical_plan) = rt.block_on(async { ctx.register_parquet("t", &path, Default::default()) .await .unwrap(); // Warm the OS page cache - let df = ctx.sql(&build_group_by_sql(num_cols)).await.unwrap(); - let _ = df.collect().await.unwrap(); + let warmup_df = ctx.sql(&sql).await.unwrap(); + let _ = warmup_df.collect().await.unwrap(); + // Pre-optimize: parse SQL and produce optimized logical plan once + let df = ctx.sql(&sql).await.unwrap(); + df.into_parts() }); BenchContext { - ctx: Arc::new(Mutex::new(ctx)), + session_state, + logical_plan, _temp_file: temp_file, } } -fn criterion_benchmark(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - - // === Experiment 1: Fixed ~100-1000 groups, vary column count === - let b_ctx = prepare_context(&rt, 2, 10); // 10^2 = 100 groups - c.bench_function("fixed_groups_cols_2_grp_100", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(2))) - }); - - let b_ctx = prepare_context(&rt, 3, 5); // 5^3 = 125 groups - c.bench_function("fixed_groups_cols_3_grp_125", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(3))) - }); - - let b_ctx = prepare_context(&rt, 4, 3); // 3^4 = 81 groups - c.bench_function("fixed_groups_cols_4_grp_81", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) - }); - - let b_ctx = prepare_context(&rt, 6, 3); // 3^6 = 729 groups - c.bench_function("fixed_groups_cols_6_grp_729", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(6))) - }); - - let b_ctx = prepare_context(&rt, 8, 2); // 2^8 = 256 groups - c.bench_function("fixed_groups_cols_8_grp_256", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(8))) - }); - - let b_ctx = prepare_context(&rt, 10, 2); // 2^10 = 1024 groups - c.bench_function("fixed_groups_cols_10_grp_1024", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(10))) - }); - - // === Experiment 1b: High groups (~1M), vary column count === - let b_ctx = prepare_context(&rt, 2, 1000); // 1000^2 = 1M groups - c.bench_function("high_groups_cols_2_grp_1M", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(2))) - }); - - let b_ctx = prepare_context(&rt, 3, 100); // 100^3 = 1M groups - c.bench_function("high_groups_cols_3_grp_1M", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(3))) - }); - - let b_ctx = prepare_context(&rt, 4, 32); // 32^4 = ~1M groups - c.bench_function("high_groups_cols_4_grp_1M", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) - }); - - let b_ctx = prepare_context(&rt, 6, 10); // 10^6 = 1M groups - c.bench_function("high_groups_cols_6_grp_1M", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(6))) - }); - - let b_ctx = prepare_context(&rt, 8, 6); // 6^8 = ~1.7M groups - c.bench_function("high_groups_cols_8_grp_1M", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(8))) +fn execute_query(rt: &Runtime, state: &SessionState, plan: &LogicalPlan) { + rt.block_on(async { + let physical_plan = state.create_physical_plan(plan).await.unwrap(); + let task_ctx = state.task_ctx(); + let results = collect(physical_plan, task_ctx).await.unwrap(); + black_box(results); }); +} - let b_ctx = prepare_context(&rt, 10, 4); // 4^10 = ~1M groups - c.bench_function("high_groups_cols_10_grp_1M", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(10))) - }); +/// Experiment 1: Reproduce issue #17850 scenario. +/// 3 columns, cardinality 4 per column (=64 groups), varying row counts. +/// This is the exact configuration where vectorized regressed 33%. +fn bench_issue_17850_regression(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("issue_17850_regression"); + group.sample_size(10); + + let num_cols = 3; + let num_groups = 64; // 4^3 + + // Scale row count to find where overhead dominates + for num_rows in [1_000_000, 5_000_000, 10_000_000, 20_000_000, 50_000_000] { + for mode in [GroupMode::Vectorized, GroupMode::RowBased] { + let b_ctx = prepare_context( + &rt, + num_cols, + num_groups, + num_rows, + DEFAULT_BATCH_SIZE, + mode, + DataPattern::Sequential, + ); + group.bench_with_input( + BenchmarkId::new(format!("{mode}"), format!("{num_rows}_rows")), + &num_rows, + |b, _| { + b.iter(|| { + execute_query(&rt, &b_ctx.session_state, &b_ctx.logical_plan) + }) + }, + ); + } + } + group.finish(); +} - // === Experiment 2: Fixed 4 columns, vary group count === - let b_ctx = prepare_context(&rt, 4, 2); // 2^4 = 16 groups - c.bench_function("fixed_4cols_grp_16", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) - }); +/// Experiment 2: Low cardinality sweep. +/// Fixed 3-4 columns, very few groups (4-256), default row count. +/// Tests the per-batch overhead sensitivity. +fn bench_low_cardinality(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("low_cardinality"); + group.sample_size(15); + + // 3 cols with varying per-column cardinality + for (num_cols, per_col_card) in + [(3usize, 2usize), (3, 4), (3, 8), (4, 2), (4, 4), (4, 8)] + { + let num_groups = per_col_card.pow(num_cols as u32); + for mode in [GroupMode::Vectorized, GroupMode::RowBased] { + let b_ctx = prepare_context( + &rt, + num_cols, + num_groups, + DEFAULT_NUM_ROWS, + DEFAULT_BATCH_SIZE, + mode, + DataPattern::Sequential, + ); + group.bench_with_input( + BenchmarkId::new( + format!("{mode}"), + format!("cols_{num_cols}_card_{per_col_card}_grp_{num_groups}"), + ), + &num_groups, + |b, _| { + b.iter(|| { + execute_query(&rt, &b_ctx.session_state, &b_ctx.logical_plan) + }) + }, + ); + } + } + group.finish(); +} - let b_ctx = prepare_context(&rt, 4, 5); // 5^4 = 625 groups - c.bench_function("fixed_4cols_grp_625", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) - }); +/// Experiment 3: Batch size sensitivity. +/// Tests how different batch sizes affect the per-batch overhead ratio. +/// Issue #17850 suggests vectorized has higher fixed per-batch cost. +fn bench_batch_size_sensitivity(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("batch_size_sensitivity"); + group.sample_size(10); + + let num_cols = 3; + let num_groups = 64; // 4^3, same as issue #17850 + + for batch_size in [1024, 4096, 8192, 16384, 32768] { + for mode in [GroupMode::Vectorized, GroupMode::RowBased] { + let b_ctx = prepare_context( + &rt, + num_cols, + num_groups, + DEFAULT_NUM_ROWS, + batch_size, + mode, + DataPattern::Sequential, + ); + group.bench_with_input( + BenchmarkId::new(format!("{mode}"), format!("batch_{batch_size}")), + &batch_size, + |b, _| { + b.iter(|| { + execute_query(&rt, &b_ctx.session_state, &b_ctx.logical_plan) + }) + }, + ); + } + } + group.finish(); +} - let b_ctx = prepare_context(&rt, 4, 10); // 10^4 = 10K groups - c.bench_function("fixed_4cols_grp_10000", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) - }); +/// Experiment 4: Column count scaling. +/// Fixed low group count, increasing columns to isolate per-column overhead. +fn bench_column_scaling(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("column_scaling"); + group.sample_size(15); + + let fixed_low_cases: &[(usize, usize)] = + &[(2, 100), (3, 125), (4, 81), (6, 729), (8, 256), (10, 1024)]; + + for &(num_cols, num_groups) in fixed_low_cases { + for mode in [GroupMode::Vectorized, GroupMode::RowBased] { + let b_ctx = prepare_context( + &rt, + num_cols, + num_groups, + DEFAULT_NUM_ROWS, + DEFAULT_BATCH_SIZE, + mode, + DataPattern::Sequential, + ); + group.bench_with_input( + BenchmarkId::new( + format!("{mode}"), + format!("cols_{num_cols}_grp_{num_groups}"), + ), + &num_cols, + |b, _| { + b.iter(|| { + execute_query(&rt, &b_ctx.session_state, &b_ctx.logical_plan) + }) + }, + ); + } + } + group.finish(); +} - let b_ctx = prepare_context(&rt, 4, 30); // 30^4 = 810K groups - c.bench_function("fixed_4cols_grp_810000", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) - }); +/// Experiment 5: High cardinality column scaling. +/// ~1M groups with increasing column count. +fn bench_high_cardinality_scaling(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("high_cardinality_scaling"); + group.sample_size(10); + + for num_cols in [2, 3, 4, 6, 8, 10] { + let num_groups = DEFAULT_NUM_ROWS; // 1M groups = ~unique per row + for mode in [GroupMode::Vectorized, GroupMode::RowBased] { + let b_ctx = prepare_context( + &rt, + num_cols, + num_groups, + DEFAULT_NUM_ROWS, + DEFAULT_BATCH_SIZE, + mode, + DataPattern::Sequential, + ); + group.bench_with_input( + BenchmarkId::new(format!("{mode}"), format!("cols_{num_cols}_grp_1M")), + &num_cols, + |b, _| { + b.iter(|| { + execute_query(&rt, &b_ctx.session_state, &b_ctx.logical_plan) + }) + }, + ); + } + } + group.finish(); +} - let b_ctx = prepare_context(&rt, 4, 100); // 100^4 = 100M groups - c.bench_function("fixed_4cols_grp_100M", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) - }); +/// Experiment 6: Group count crossover search. +/// Fixed 4 columns, sweep group count to find crossover point. +fn bench_group_count_sweep(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("group_count_sweep"); + group.sample_size(15); + + for num_groups in [ + 16, 64, 256, 1000, 5000, 10_000, 50_000, 100_000, 500_000, 1_000_000, + ] { + for mode in [GroupMode::Vectorized, GroupMode::RowBased] { + let b_ctx = prepare_context( + &rt, + 4, + num_groups, + DEFAULT_NUM_ROWS, + DEFAULT_BATCH_SIZE, + mode, + DataPattern::Sequential, + ); + group.bench_with_input( + BenchmarkId::new(format!("{mode}"), format!("grp_{num_groups}")), + &num_groups, + |b, _| { + b.iter(|| { + execute_query(&rt, &b_ctx.session_state, &b_ctx.logical_plan) + }) + }, + ); + } + } + group.finish(); +} - let b_ctx = prepare_context(&rt, 4, 500); // 500^4 = 62.5B groups - c.bench_function("fixed_4cols_grp_62B", |b| { - b.iter(|| query(b_ctx.ctx.clone(), &rt, &build_group_by_sql(4))) - }); +/// Experiment 7: Random vs sequential data pattern. +/// Tests whether random access patterns (matching issue #17850's exact SQL using +/// `(random() * 4)::integer`) change the performance characteristics due to cache +/// effects in hash probing. +/// 3 cols, per_col_card=4 (64 theoretical groups), 1M and 5M rows. +fn bench_random_vs_sequential(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("random_vs_sequential"); + group.sample_size(10); + + let num_cols = 3; + let num_groups = 64; // 4^3 + + for num_rows in [1_000_000, 5_000_000] { + for pattern in [DataPattern::Sequential, DataPattern::Random] { + for mode in [GroupMode::Vectorized, GroupMode::RowBased] { + let b_ctx = prepare_context( + &rt, + num_cols, + num_groups, + num_rows, + DEFAULT_BATCH_SIZE, + mode, + pattern, + ); + group.bench_with_input( + BenchmarkId::new( + format!("{mode}/{pattern}"), + format!("{num_rows}_rows"), + ), + &num_rows, + |b, _| { + b.iter(|| { + execute_query(&rt, &b_ctx.session_state, &b_ctx.logical_plan) + }) + }, + ); + } + } + } + group.finish(); } -criterion_group!(benches, criterion_benchmark); +criterion_group!( + benches, + bench_issue_17850_regression, + bench_low_cardinality, + bench_batch_size_sensitivity, + bench_column_scaling, + bench_high_cardinality_scaling, + bench_group_count_sweep, + bench_random_vs_sequential, +); criterion_main!(benches); From fdd0c928bbeec63ba67241d929c66f850f9ffe46 Mon Sep 17 00:00:00 2001 From: Nathan Bezualem Date: Wed, 20 May 2026 12:07:49 -0400 Subject: [PATCH 3/5] Add direct intern() benchmark for vectorized vs row-based GROUP BY Adds a fair apples-to-apples benchmark that directly calls GroupValues::intern() with identical Int32 data for both GroupValuesColumn (vectorized) and GroupValuesRows (row-based). This eliminates the previous confounding factors (different data types, SQL/planning overhead) and confirms the regression reported in #17850: row-based is 16-19% faster at low cardinality (64 groups), with a crossover at ~200K-500K groups where vectorized becomes faster. Experiments: - Issue #17850 reproduction (3 cols, 64 groups, 1M-50M rows) - Low cardinality sweep (8-4096 groups) - Batch size sensitivity (1K-32K) - Column count scaling (2-10 cols) - High cardinality scaling (1M groups) - Group count sweep (16 to 1M groups) - Random vs sequential data patterns --- datafusion/physical-plan/Cargo.toml | 4 + .../physical-plan/benches/multi_group_by.rs | 450 ++++++++++++++++++ .../src/aggregates/group_values/mod.rs | 2 +- 3 files changed, 455 insertions(+), 1 deletion(-) create mode 100644 datafusion/physical-plan/benches/multi_group_by.rs diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index c6710262776c7..d4f1b379193de 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -116,3 +116,7 @@ required-features = ["test_utils"] [[bench]] harness = false name = "dictionary_group_values" + +[[bench]] +harness = false +name = "multi_group_by" diff --git a/datafusion/physical-plan/benches/multi_group_by.rs b/datafusion/physical-plan/benches/multi_group_by.rs new file mode 100644 index 0000000000000..ca1606044368c --- /dev/null +++ b/datafusion/physical-plan/benches/multi_group_by.rs @@ -0,0 +1,450 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for multi-column GROUP BY performance comparing vectorized +//! (`GroupValuesColumn`) vs row-based (`GroupValuesRows`) implementations. +//! +//! Motivated by which +//! showed vectorized can regress for low-cardinality, high-row-count scenarios. +//! +//! Uses the direct `GroupValues::intern()` API with identical Int32 data for +//! both implementations — a fair apples-to-apples comparison with the same +//! hashing and data layout. + +use arrow::array::{ArrayRef, Int32Array}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion_physical_plan::aggregates::group_values::GroupValues; +use datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupValuesColumn; +use datafusion_physical_plan::aggregates::group_values::row::GroupValuesRows; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use std::hint::black_box; +use std::sync::Arc; + +const DEFAULT_BATCH_SIZE: usize = 8192; + +fn make_schema(num_cols: usize) -> SchemaRef { + let fields: Vec = (0..num_cols) + .map(|i| Field::new(format!("col_{i}"), DataType::Int32, false)) + .collect(); + Arc::new(Schema::new(fields)) +} + +#[derive(Clone, Copy)] +enum DataPattern { + Sequential, + Random, +} + +fn generate_batches( + num_cols: usize, + num_distinct_groups: usize, + num_rows: usize, + batch_size: usize, + pattern: DataPattern, +) -> Vec> { + let per_col_card = (num_distinct_groups as f64) + .powf(1.0 / num_cols as f64) + .ceil() as usize; + + let mut rng = StdRng::seed_from_u64(42); + let num_batches = num_rows / batch_size; + + (0..num_batches) + .map(|batch_idx| { + let batch_start = batch_idx * batch_size; + (0..num_cols) + .map(|col_idx| { + let values: Vec = (0..batch_size) + .map(|row| match pattern { + DataPattern::Sequential => { + let global_row = batch_start + row; + let group_id = global_row % num_distinct_groups; + let divisor = per_col_card.pow(col_idx as u32); + ((group_id / divisor) % per_col_card) as i32 + } + DataPattern::Random => { + rng.random_range(0..per_col_card as i32) + } + }) + .collect(); + Arc::new(Int32Array::from(values)) as ArrayRef + }) + .collect() + }) + .collect() +} + +fn create_group_values(schema: &SchemaRef, vectorized: bool) -> Box { + if vectorized { + Box::new(GroupValuesColumn::::try_new(Arc::clone(schema)).unwrap()) + } else { + Box::new(GroupValuesRows::try_new(Arc::clone(schema)).unwrap()) + } +} + +fn bench_intern( + gv: &mut Box, + batches: &[Vec], + groups: &mut Vec, +) { + for batch in batches { + groups.clear(); + gv.intern(batch, groups).unwrap(); + } + black_box(&*groups); +} + +/// Experiment 1: Issue #17850 regression scenario. +/// 3 columns, 64 groups (4^3), scaling row count. +fn bench_issue_17850_regression(c: &mut Criterion) { + let mut group = c.benchmark_group("issue_17850_regression"); + group.sample_size(10); + + let num_cols = 3; + let num_groups = 64; + let schema = make_schema(num_cols); + + for num_rows in [1_000_000, 5_000_000, 10_000_000, 20_000_000, 50_000_000] { + let batches = generate_batches( + num_cols, + num_groups, + num_rows, + DEFAULT_BATCH_SIZE, + DataPattern::Sequential, + ); + + for vectorized in [true, false] { + let label = if vectorized { + "vectorized" + } else { + "row_based" + }; + group.bench_with_input( + BenchmarkId::new(label, format!("{num_rows}_rows")), + &batches, + |b, batches| { + b.iter_batched_ref( + || { + ( + create_group_values(&schema, vectorized), + Vec::::with_capacity(DEFAULT_BATCH_SIZE), + ) + }, + |(gv, groups)| bench_intern(gv, batches, groups), + criterion::BatchSize::LargeInput, + ); + }, + ); + } + } + group.finish(); +} + +/// Experiment 2: Low cardinality sweep. +fn bench_low_cardinality(c: &mut Criterion) { + let mut group = c.benchmark_group("low_cardinality"); + group.sample_size(15); + + for (num_cols, per_col_card) in + [(3usize, 2usize), (3, 4), (3, 8), (4, 2), (4, 4), (4, 8)] + { + let num_groups = per_col_card.pow(num_cols as u32); + let schema = make_schema(num_cols); + let batches = generate_batches( + num_cols, + num_groups, + 1_000_000, + DEFAULT_BATCH_SIZE, + DataPattern::Sequential, + ); + + for vectorized in [true, false] { + let label = if vectorized { + "vectorized" + } else { + "row_based" + }; + group.bench_with_input( + BenchmarkId::new( + label, + format!("cols_{num_cols}_card_{per_col_card}_grp_{num_groups}"), + ), + &batches, + |b, batches| { + b.iter_batched_ref( + || { + ( + create_group_values(&schema, vectorized), + Vec::::with_capacity(DEFAULT_BATCH_SIZE), + ) + }, + |(gv, groups)| bench_intern(gv, batches, groups), + criterion::BatchSize::LargeInput, + ); + }, + ); + } + } + group.finish(); +} + +/// Experiment 3: Batch size sensitivity. +fn bench_batch_size_sensitivity(c: &mut Criterion) { + let mut group = c.benchmark_group("batch_size_sensitivity"); + group.sample_size(10); + + let num_cols = 3; + let num_groups = 64; + let schema = make_schema(num_cols); + + for batch_size in [1024, 4096, 8192, 16384, 32768] { + let batches = generate_batches( + num_cols, + num_groups, + 1_000_000, + batch_size, + DataPattern::Sequential, + ); + + for vectorized in [true, false] { + let label = if vectorized { + "vectorized" + } else { + "row_based" + }; + group.bench_with_input( + BenchmarkId::new(label, format!("batch_{batch_size}")), + &batches, + |b, batches| { + b.iter_batched_ref( + || { + ( + create_group_values(&schema, vectorized), + Vec::::with_capacity(batch_size), + ) + }, + |(gv, groups)| bench_intern(gv, batches, groups), + criterion::BatchSize::LargeInput, + ); + }, + ); + } + } + group.finish(); +} + +/// Experiment 4: Column count scaling with low groups. +fn bench_column_scaling(c: &mut Criterion) { + let mut group = c.benchmark_group("column_scaling"); + group.sample_size(15); + + let cases: &[(usize, usize)] = + &[(2, 100), (3, 125), (4, 81), (6, 729), (8, 256), (10, 1024)]; + + for &(num_cols, num_groups) in cases { + let schema = make_schema(num_cols); + let batches = generate_batches( + num_cols, + num_groups, + 1_000_000, + DEFAULT_BATCH_SIZE, + DataPattern::Sequential, + ); + + for vectorized in [true, false] { + let label = if vectorized { + "vectorized" + } else { + "row_based" + }; + group.bench_with_input( + BenchmarkId::new(label, format!("cols_{num_cols}_grp_{num_groups}")), + &batches, + |b, batches| { + b.iter_batched_ref( + || { + ( + create_group_values(&schema, vectorized), + Vec::::with_capacity(DEFAULT_BATCH_SIZE), + ) + }, + |(gv, groups)| bench_intern(gv, batches, groups), + criterion::BatchSize::LargeInput, + ); + }, + ); + } + } + group.finish(); +} + +/// Experiment 5: High cardinality column scaling (~1M groups). +fn bench_high_cardinality_scaling(c: &mut Criterion) { + let mut group = c.benchmark_group("high_cardinality_scaling"); + group.sample_size(10); + + for num_cols in [2, 3, 4, 6, 8, 10] { + let num_groups = 1_000_000; + let schema = make_schema(num_cols); + let batches = generate_batches( + num_cols, + num_groups, + 1_000_000, + DEFAULT_BATCH_SIZE, + DataPattern::Sequential, + ); + + for vectorized in [true, false] { + let label = if vectorized { + "vectorized" + } else { + "row_based" + }; + group.bench_with_input( + BenchmarkId::new(label, format!("cols_{num_cols}_grp_1M")), + &batches, + |b, batches| { + b.iter_batched_ref( + || { + ( + create_group_values(&schema, vectorized), + Vec::::with_capacity(DEFAULT_BATCH_SIZE), + ) + }, + |(gv, groups)| bench_intern(gv, batches, groups), + criterion::BatchSize::LargeInput, + ); + }, + ); + } + } + group.finish(); +} + +/// Experiment 6: Group count sweep with fixed 4 columns. +fn bench_group_count_sweep(c: &mut Criterion) { + let mut group = c.benchmark_group("group_count_sweep"); + group.sample_size(15); + + let num_cols = 4; + let schema = make_schema(num_cols); + + for num_groups in [ + 16, 64, 256, 1000, 5000, 10_000, 50_000, 100_000, 500_000, 1_000_000, + ] { + let batches = generate_batches( + num_cols, + num_groups, + 1_000_000, + DEFAULT_BATCH_SIZE, + DataPattern::Sequential, + ); + + for vectorized in [true, false] { + let label = if vectorized { + "vectorized" + } else { + "row_based" + }; + group.bench_with_input( + BenchmarkId::new(label, format!("grp_{num_groups}")), + &batches, + |b, batches| { + b.iter_batched_ref( + || { + ( + create_group_values(&schema, vectorized), + Vec::::with_capacity(DEFAULT_BATCH_SIZE), + ) + }, + |(gv, groups)| bench_intern(gv, batches, groups), + criterion::BatchSize::LargeInput, + ); + }, + ); + } + } + group.finish(); +} + +/// Experiment 7: Random vs sequential data pattern. +fn bench_random_vs_sequential(c: &mut Criterion) { + let mut group = c.benchmark_group("random_vs_sequential"); + group.sample_size(10); + + let num_cols = 3; + let num_groups = 64; + let schema = make_schema(num_cols); + + for num_rows in [1_000_000, 5_000_000] { + for pattern in [DataPattern::Sequential, DataPattern::Random] { + let pattern_label = match pattern { + DataPattern::Sequential => "sequential", + DataPattern::Random => "random", + }; + let batches = generate_batches( + num_cols, + num_groups, + num_rows, + DEFAULT_BATCH_SIZE, + pattern, + ); + + for vectorized in [true, false] { + let label = if vectorized { + "vectorized" + } else { + "row_based" + }; + group.bench_with_input( + BenchmarkId::new( + format!("{label}/{pattern_label}"), + format!("{num_rows}_rows"), + ), + &batches, + |b, batches| { + b.iter_batched_ref( + || { + ( + create_group_values(&schema, vectorized), + Vec::::with_capacity(DEFAULT_BATCH_SIZE), + ) + }, + |(gv, groups)| bench_intern(gv, batches, groups), + criterion::BatchSize::LargeInput, + ); + }, + ); + } + } + } + group.finish(); +} + +criterion_group!( + benches, + bench_issue_17850_regression, + bench_low_cardinality, + bench_batch_size_sensitivity, + bench_column_scaling, + bench_high_cardinality_scaling, + bench_group_count_sweep, + bench_random_vs_sequential, +); +criterion_main!(benches); diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 2f3b1a19e7d73..be289f08c085b 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -30,7 +30,7 @@ use datafusion_expr::EmitTo; pub mod multi_group_by; -mod row; +pub mod row; mod single_group_by; use datafusion_physical_expr::binary_map::OutputType; use multi_group_by::GroupValuesColumn; From c5582f320e6b30fdb6a4018d3ce9a4783912fce3 Mon Sep 17 00:00:00 2001 From: Nathan Bezualem Date: Wed, 20 May 2026 14:38:36 -0400 Subject: [PATCH 4/5] Remove end-to-end SQL benchmark in favor of direct intern() benchmark --- datafusion/core/Cargo.toml | 5 - datafusion/core/benches/multi_group_by.rs | 512 ---------------------- 2 files changed, 517 deletions(-) delete mode 100644 datafusion/core/benches/multi_group_by.rs diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 825d5a2318a64..5019560da9306 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -294,11 +294,6 @@ harness = false name = "preserve_file_partitioning" required-features = ["parquet"] -[[bench]] -harness = false -name = "multi_group_by" -required-features = ["parquet"] - [[bench]] harness = false name = "reset_plan_states" diff --git a/datafusion/core/benches/multi_group_by.rs b/datafusion/core/benches/multi_group_by.rs deleted file mode 100644 index 6279848bf3b61..0000000000000 --- a/datafusion/core/benches/multi_group_by.rs +++ /dev/null @@ -1,512 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Benchmarks for multi-column GROUP BY performance comparing vectorized -//! (`GroupValuesColumn`) vs row-based (`GroupValuesRows`) implementations. -//! -//! Motivated by which -//! showed vectorized can regress 33% for low-cardinality, high-row-count -//! scenarios (3 cols, 64 groups, 1B rows). -//! -//! Design: -//! - **Implementation comparison**: Int32 columns trigger the vectorized path; -//! FixedSizeBinary(4) columns trigger the row-based fallback. -//! - **Execution-only timing**: Pre-optimizes the logical plan once; each -//! iteration only does physical planning + execution. -//! - **Exact cardinality**: Sequential enumeration guarantees precise NDV. -//! - **Regression scenario**: Reproduces issue #17850 conditions (few groups, -//! many rows) to identify the crossover point. - -use arrow::array::{ArrayRef, FixedSizeBinaryArray, Int32Array, RecordBatch}; -use arrow::datatypes::{DataType, Field, Schema}; -use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; -use datafusion::execution::SessionState; -use datafusion::physical_plan::collect; -use datafusion::prelude::{SessionConfig, SessionContext}; -use datafusion_expr::LogicalPlan; -use parquet::arrow::ArrowWriter; -use parquet::file::properties::WriterProperties; -use rand::rngs::StdRng; -use rand::{Rng, SeedableRng}; -use std::hint::black_box; -use std::sync::Arc; -use tempfile::NamedTempFile; -use tokio::runtime::Runtime; - -const DEFAULT_NUM_ROWS: usize = 1_000_000; -const DEFAULT_BATCH_SIZE: usize = 8192; - -#[derive(Clone, Copy, PartialEq, Eq)] -enum GroupMode { - Vectorized, - RowBased, -} - -impl std::fmt::Display for GroupMode { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - GroupMode::Vectorized => write!(f, "vectorized"), - GroupMode::RowBased => write!(f, "row_based"), - } - } -} - -#[derive(Clone, Copy, PartialEq, Eq)] -enum DataPattern { - /// Sequential enumeration: deterministic cycling through all group combinations. - Sequential, - /// Random: each column independently picks a value from 0..per_col_card using a seeded RNG. - /// This matches the data generation in issue #17850 (random() * 4)::integer. - Random, -} - -impl std::fmt::Display for DataPattern { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - DataPattern::Sequential => write!(f, "sequential"), - DataPattern::Random => write!(f, "random"), - } - } -} - -fn build_group_by_sql(num_cols: usize) -> String { - let cols: Vec = (0..num_cols).map(|i| format!("col_{i}")).collect(); - let col_list = cols.join(", "); - format!("SELECT {col_list} FROM t GROUP BY {col_list}") -} - -fn generate_parquet_file( - num_cols: usize, - num_distinct_groups: usize, - num_rows: usize, - batch_size: usize, - mode: GroupMode, - pattern: DataPattern, -) -> NamedTempFile { - let fields: Vec = (0..num_cols) - .map(|i| { - let dt = match mode { - GroupMode::Vectorized => DataType::Int32, - GroupMode::RowBased => DataType::FixedSizeBinary(4), - }; - Field::new(format!("col_{i}"), dt, false) - }) - .collect(); - let schema = Arc::new(Schema::new(fields)); - - let per_col_card = (num_distinct_groups as f64) - .powf(1.0 / num_cols as f64) - .ceil() as usize; - - let mut temp_file = tempfile::Builder::new() - .prefix("multi_group_by") - .suffix(".parquet") - .tempfile() - .unwrap(); - - let props = WriterProperties::builder() - .set_max_row_group_row_count(Some(num_rows)) - .build(); - - let mut writer = - ArrowWriter::try_new(&mut temp_file, Arc::clone(&schema), Some(props)).unwrap(); - - let mut rng = StdRng::seed_from_u64(42); - let num_batches = num_rows / batch_size; - for batch_idx in 0..num_batches { - let batch_start = batch_idx * batch_size; - let columns: Vec = (0..num_cols) - .map(|col_idx| match mode { - GroupMode::Vectorized => { - let values: Vec = (0..batch_size) - .map(|row| match pattern { - DataPattern::Sequential => { - let global_row = batch_start + row; - let group_id = global_row % num_distinct_groups; - let divisor = per_col_card.pow(col_idx as u32); - ((group_id / divisor) % per_col_card) as i32 - } - DataPattern::Random => { - rng.random_range(0..per_col_card as i32) - } - }) - .collect(); - Arc::new(Int32Array::from(values)) as ArrayRef - } - GroupMode::RowBased => { - let values: Vec> = (0..batch_size) - .map(|row| { - let val = match pattern { - DataPattern::Sequential => { - let global_row = batch_start + row; - let group_id = global_row % num_distinct_groups; - let divisor = per_col_card.pow(col_idx as u32); - ((group_id / divisor) % per_col_card) as i32 - } - DataPattern::Random => { - rng.random_range(0..per_col_card as i32) - } - }; - val.to_le_bytes().to_vec() - }) - .collect(); - let refs: Vec<&[u8]> = values.iter().map(|v| v.as_slice()).collect(); - Arc::new( - FixedSizeBinaryArray::try_from_iter(refs.into_iter()).unwrap(), - ) as ArrayRef - } - }) - .collect(); - let batch = RecordBatch::try_new(Arc::clone(&schema), columns).unwrap(); - writer.write(&batch).unwrap(); - } - - writer.close().unwrap(); - temp_file -} - -struct BenchContext { - session_state: SessionState, - logical_plan: LogicalPlan, - _temp_file: NamedTempFile, -} - -fn prepare_context( - rt: &Runtime, - num_cols: usize, - num_distinct_groups: usize, - num_rows: usize, - batch_size: usize, - mode: GroupMode, - pattern: DataPattern, -) -> BenchContext { - let temp_file = generate_parquet_file( - num_cols, - num_distinct_groups, - num_rows, - batch_size, - mode, - pattern, - ); - let path = temp_file.path().to_str().unwrap().to_string(); - - let config = SessionConfig::new() - .with_target_partitions(1) - .with_batch_size(batch_size); - let ctx = SessionContext::new_with_config(config); - let sql = build_group_by_sql(num_cols); - - let (session_state, logical_plan) = rt.block_on(async { - ctx.register_parquet("t", &path, Default::default()) - .await - .unwrap(); - // Warm the OS page cache - let warmup_df = ctx.sql(&sql).await.unwrap(); - let _ = warmup_df.collect().await.unwrap(); - // Pre-optimize: parse SQL and produce optimized logical plan once - let df = ctx.sql(&sql).await.unwrap(); - df.into_parts() - }); - - BenchContext { - session_state, - logical_plan, - _temp_file: temp_file, - } -} - -fn execute_query(rt: &Runtime, state: &SessionState, plan: &LogicalPlan) { - rt.block_on(async { - let physical_plan = state.create_physical_plan(plan).await.unwrap(); - let task_ctx = state.task_ctx(); - let results = collect(physical_plan, task_ctx).await.unwrap(); - black_box(results); - }); -} - -/// Experiment 1: Reproduce issue #17850 scenario. -/// 3 columns, cardinality 4 per column (=64 groups), varying row counts. -/// This is the exact configuration where vectorized regressed 33%. -fn bench_issue_17850_regression(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - let mut group = c.benchmark_group("issue_17850_regression"); - group.sample_size(10); - - let num_cols = 3; - let num_groups = 64; // 4^3 - - // Scale row count to find where overhead dominates - for num_rows in [1_000_000, 5_000_000, 10_000_000, 20_000_000, 50_000_000] { - for mode in [GroupMode::Vectorized, GroupMode::RowBased] { - let b_ctx = prepare_context( - &rt, - num_cols, - num_groups, - num_rows, - DEFAULT_BATCH_SIZE, - mode, - DataPattern::Sequential, - ); - group.bench_with_input( - BenchmarkId::new(format!("{mode}"), format!("{num_rows}_rows")), - &num_rows, - |b, _| { - b.iter(|| { - execute_query(&rt, &b_ctx.session_state, &b_ctx.logical_plan) - }) - }, - ); - } - } - group.finish(); -} - -/// Experiment 2: Low cardinality sweep. -/// Fixed 3-4 columns, very few groups (4-256), default row count. -/// Tests the per-batch overhead sensitivity. -fn bench_low_cardinality(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - let mut group = c.benchmark_group("low_cardinality"); - group.sample_size(15); - - // 3 cols with varying per-column cardinality - for (num_cols, per_col_card) in - [(3usize, 2usize), (3, 4), (3, 8), (4, 2), (4, 4), (4, 8)] - { - let num_groups = per_col_card.pow(num_cols as u32); - for mode in [GroupMode::Vectorized, GroupMode::RowBased] { - let b_ctx = prepare_context( - &rt, - num_cols, - num_groups, - DEFAULT_NUM_ROWS, - DEFAULT_BATCH_SIZE, - mode, - DataPattern::Sequential, - ); - group.bench_with_input( - BenchmarkId::new( - format!("{mode}"), - format!("cols_{num_cols}_card_{per_col_card}_grp_{num_groups}"), - ), - &num_groups, - |b, _| { - b.iter(|| { - execute_query(&rt, &b_ctx.session_state, &b_ctx.logical_plan) - }) - }, - ); - } - } - group.finish(); -} - -/// Experiment 3: Batch size sensitivity. -/// Tests how different batch sizes affect the per-batch overhead ratio. -/// Issue #17850 suggests vectorized has higher fixed per-batch cost. -fn bench_batch_size_sensitivity(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - let mut group = c.benchmark_group("batch_size_sensitivity"); - group.sample_size(10); - - let num_cols = 3; - let num_groups = 64; // 4^3, same as issue #17850 - - for batch_size in [1024, 4096, 8192, 16384, 32768] { - for mode in [GroupMode::Vectorized, GroupMode::RowBased] { - let b_ctx = prepare_context( - &rt, - num_cols, - num_groups, - DEFAULT_NUM_ROWS, - batch_size, - mode, - DataPattern::Sequential, - ); - group.bench_with_input( - BenchmarkId::new(format!("{mode}"), format!("batch_{batch_size}")), - &batch_size, - |b, _| { - b.iter(|| { - execute_query(&rt, &b_ctx.session_state, &b_ctx.logical_plan) - }) - }, - ); - } - } - group.finish(); -} - -/// Experiment 4: Column count scaling. -/// Fixed low group count, increasing columns to isolate per-column overhead. -fn bench_column_scaling(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - let mut group = c.benchmark_group("column_scaling"); - group.sample_size(15); - - let fixed_low_cases: &[(usize, usize)] = - &[(2, 100), (3, 125), (4, 81), (6, 729), (8, 256), (10, 1024)]; - - for &(num_cols, num_groups) in fixed_low_cases { - for mode in [GroupMode::Vectorized, GroupMode::RowBased] { - let b_ctx = prepare_context( - &rt, - num_cols, - num_groups, - DEFAULT_NUM_ROWS, - DEFAULT_BATCH_SIZE, - mode, - DataPattern::Sequential, - ); - group.bench_with_input( - BenchmarkId::new( - format!("{mode}"), - format!("cols_{num_cols}_grp_{num_groups}"), - ), - &num_cols, - |b, _| { - b.iter(|| { - execute_query(&rt, &b_ctx.session_state, &b_ctx.logical_plan) - }) - }, - ); - } - } - group.finish(); -} - -/// Experiment 5: High cardinality column scaling. -/// ~1M groups with increasing column count. -fn bench_high_cardinality_scaling(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - let mut group = c.benchmark_group("high_cardinality_scaling"); - group.sample_size(10); - - for num_cols in [2, 3, 4, 6, 8, 10] { - let num_groups = DEFAULT_NUM_ROWS; // 1M groups = ~unique per row - for mode in [GroupMode::Vectorized, GroupMode::RowBased] { - let b_ctx = prepare_context( - &rt, - num_cols, - num_groups, - DEFAULT_NUM_ROWS, - DEFAULT_BATCH_SIZE, - mode, - DataPattern::Sequential, - ); - group.bench_with_input( - BenchmarkId::new(format!("{mode}"), format!("cols_{num_cols}_grp_1M")), - &num_cols, - |b, _| { - b.iter(|| { - execute_query(&rt, &b_ctx.session_state, &b_ctx.logical_plan) - }) - }, - ); - } - } - group.finish(); -} - -/// Experiment 6: Group count crossover search. -/// Fixed 4 columns, sweep group count to find crossover point. -fn bench_group_count_sweep(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - let mut group = c.benchmark_group("group_count_sweep"); - group.sample_size(15); - - for num_groups in [ - 16, 64, 256, 1000, 5000, 10_000, 50_000, 100_000, 500_000, 1_000_000, - ] { - for mode in [GroupMode::Vectorized, GroupMode::RowBased] { - let b_ctx = prepare_context( - &rt, - 4, - num_groups, - DEFAULT_NUM_ROWS, - DEFAULT_BATCH_SIZE, - mode, - DataPattern::Sequential, - ); - group.bench_with_input( - BenchmarkId::new(format!("{mode}"), format!("grp_{num_groups}")), - &num_groups, - |b, _| { - b.iter(|| { - execute_query(&rt, &b_ctx.session_state, &b_ctx.logical_plan) - }) - }, - ); - } - } - group.finish(); -} - -/// Experiment 7: Random vs sequential data pattern. -/// Tests whether random access patterns (matching issue #17850's exact SQL using -/// `(random() * 4)::integer`) change the performance characteristics due to cache -/// effects in hash probing. -/// 3 cols, per_col_card=4 (64 theoretical groups), 1M and 5M rows. -fn bench_random_vs_sequential(c: &mut Criterion) { - let rt = Runtime::new().unwrap(); - let mut group = c.benchmark_group("random_vs_sequential"); - group.sample_size(10); - - let num_cols = 3; - let num_groups = 64; // 4^3 - - for num_rows in [1_000_000, 5_000_000] { - for pattern in [DataPattern::Sequential, DataPattern::Random] { - for mode in [GroupMode::Vectorized, GroupMode::RowBased] { - let b_ctx = prepare_context( - &rt, - num_cols, - num_groups, - num_rows, - DEFAULT_BATCH_SIZE, - mode, - pattern, - ); - group.bench_with_input( - BenchmarkId::new( - format!("{mode}/{pattern}"), - format!("{num_rows}_rows"), - ), - &num_rows, - |b, _| { - b.iter(|| { - execute_query(&rt, &b_ctx.session_state, &b_ctx.logical_plan) - }) - }, - ); - } - } - } - group.finish(); -} - -criterion_group!( - benches, - bench_issue_17850_regression, - bench_low_cardinality, - bench_batch_size_sensitivity, - bench_column_scaling, - bench_high_cardinality_scaling, - bench_group_count_sweep, - bench_random_vs_sequential, -); -criterion_main!(benches); From bc4e23655bbd09e7211de5832d0cefe4cc1d6c57 Mon Sep 17 00:00:00 2001 From: Nathan Bezualem Date: Wed, 20 May 2026 14:46:50 -0400 Subject: [PATCH 5/5] Remove random vs sequential experiment --- .../physical-plan/benches/multi_group_by.rs | 133 +++--------------- 1 file changed, 16 insertions(+), 117 deletions(-) diff --git a/datafusion/physical-plan/benches/multi_group_by.rs b/datafusion/physical-plan/benches/multi_group_by.rs index ca1606044368c..f90b6e7358270 100644 --- a/datafusion/physical-plan/benches/multi_group_by.rs +++ b/datafusion/physical-plan/benches/multi_group_by.rs @@ -31,8 +31,6 @@ use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; use datafusion_physical_plan::aggregates::group_values::GroupValues; use datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupValuesColumn; use datafusion_physical_plan::aggregates::group_values::row::GroupValuesRows; -use rand::rngs::StdRng; -use rand::{Rng, SeedableRng}; use std::hint::black_box; use std::sync::Arc; @@ -45,24 +43,16 @@ fn make_schema(num_cols: usize) -> SchemaRef { Arc::new(Schema::new(fields)) } -#[derive(Clone, Copy)] -enum DataPattern { - Sequential, - Random, -} - fn generate_batches( num_cols: usize, num_distinct_groups: usize, num_rows: usize, batch_size: usize, - pattern: DataPattern, ) -> Vec> { let per_col_card = (num_distinct_groups as f64) .powf(1.0 / num_cols as f64) .ceil() as usize; - let mut rng = StdRng::seed_from_u64(42); let num_batches = num_rows / batch_size; (0..num_batches) @@ -71,16 +61,11 @@ fn generate_batches( (0..num_cols) .map(|col_idx| { let values: Vec = (0..batch_size) - .map(|row| match pattern { - DataPattern::Sequential => { - let global_row = batch_start + row; - let group_id = global_row % num_distinct_groups; - let divisor = per_col_card.pow(col_idx as u32); - ((group_id / divisor) % per_col_card) as i32 - } - DataPattern::Random => { - rng.random_range(0..per_col_card as i32) - } + .map(|row| { + let global_row = batch_start + row; + let group_id = global_row % num_distinct_groups; + let divisor = per_col_card.pow(col_idx as u32); + ((group_id / divisor) % per_col_card) as i32 }) .collect(); Arc::new(Int32Array::from(values)) as ArrayRef @@ -121,13 +106,8 @@ fn bench_issue_17850_regression(c: &mut Criterion) { let schema = make_schema(num_cols); for num_rows in [1_000_000, 5_000_000, 10_000_000, 20_000_000, 50_000_000] { - let batches = generate_batches( - num_cols, - num_groups, - num_rows, - DEFAULT_BATCH_SIZE, - DataPattern::Sequential, - ); + let batches = + generate_batches(num_cols, num_groups, num_rows, DEFAULT_BATCH_SIZE); for vectorized in [true, false] { let label = if vectorized { @@ -166,13 +146,8 @@ fn bench_low_cardinality(c: &mut Criterion) { { let num_groups = per_col_card.pow(num_cols as u32); let schema = make_schema(num_cols); - let batches = generate_batches( - num_cols, - num_groups, - 1_000_000, - DEFAULT_BATCH_SIZE, - DataPattern::Sequential, - ); + let batches = + generate_batches(num_cols, num_groups, 1_000_000, DEFAULT_BATCH_SIZE); for vectorized in [true, false] { let label = if vectorized { @@ -214,13 +189,7 @@ fn bench_batch_size_sensitivity(c: &mut Criterion) { let schema = make_schema(num_cols); for batch_size in [1024, 4096, 8192, 16384, 32768] { - let batches = generate_batches( - num_cols, - num_groups, - 1_000_000, - batch_size, - DataPattern::Sequential, - ); + let batches = generate_batches(num_cols, num_groups, 1_000_000, batch_size); for vectorized in [true, false] { let label = if vectorized { @@ -259,13 +228,8 @@ fn bench_column_scaling(c: &mut Criterion) { for &(num_cols, num_groups) in cases { let schema = make_schema(num_cols); - let batches = generate_batches( - num_cols, - num_groups, - 1_000_000, - DEFAULT_BATCH_SIZE, - DataPattern::Sequential, - ); + let batches = + generate_batches(num_cols, num_groups, 1_000_000, DEFAULT_BATCH_SIZE); for vectorized in [true, false] { let label = if vectorized { @@ -302,13 +266,8 @@ fn bench_high_cardinality_scaling(c: &mut Criterion) { for num_cols in [2, 3, 4, 6, 8, 10] { let num_groups = 1_000_000; let schema = make_schema(num_cols); - let batches = generate_batches( - num_cols, - num_groups, - 1_000_000, - DEFAULT_BATCH_SIZE, - DataPattern::Sequential, - ); + let batches = + generate_batches(num_cols, num_groups, 1_000_000, DEFAULT_BATCH_SIZE); for vectorized in [true, false] { let label = if vectorized { @@ -348,13 +307,8 @@ fn bench_group_count_sweep(c: &mut Criterion) { for num_groups in [ 16, 64, 256, 1000, 5000, 10_000, 50_000, 100_000, 500_000, 1_000_000, ] { - let batches = generate_batches( - num_cols, - num_groups, - 1_000_000, - DEFAULT_BATCH_SIZE, - DataPattern::Sequential, - ); + let batches = + generate_batches(num_cols, num_groups, 1_000_000, DEFAULT_BATCH_SIZE); for vectorized in [true, false] { let label = if vectorized { @@ -383,60 +337,6 @@ fn bench_group_count_sweep(c: &mut Criterion) { group.finish(); } -/// Experiment 7: Random vs sequential data pattern. -fn bench_random_vs_sequential(c: &mut Criterion) { - let mut group = c.benchmark_group("random_vs_sequential"); - group.sample_size(10); - - let num_cols = 3; - let num_groups = 64; - let schema = make_schema(num_cols); - - for num_rows in [1_000_000, 5_000_000] { - for pattern in [DataPattern::Sequential, DataPattern::Random] { - let pattern_label = match pattern { - DataPattern::Sequential => "sequential", - DataPattern::Random => "random", - }; - let batches = generate_batches( - num_cols, - num_groups, - num_rows, - DEFAULT_BATCH_SIZE, - pattern, - ); - - for vectorized in [true, false] { - let label = if vectorized { - "vectorized" - } else { - "row_based" - }; - group.bench_with_input( - BenchmarkId::new( - format!("{label}/{pattern_label}"), - format!("{num_rows}_rows"), - ), - &batches, - |b, batches| { - b.iter_batched_ref( - || { - ( - create_group_values(&schema, vectorized), - Vec::::with_capacity(DEFAULT_BATCH_SIZE), - ) - }, - |(gv, groups)| bench_intern(gv, batches, groups), - criterion::BatchSize::LargeInput, - ); - }, - ); - } - } - } - group.finish(); -} - criterion_group!( benches, bench_issue_17850_regression, @@ -445,6 +345,5 @@ criterion_group!( bench_column_scaling, bench_high_cardinality_scaling, bench_group_count_sweep, - bench_random_vs_sequential, ); criterion_main!(benches);