From 78326cbbd86e7e04aa100605e9c9e5b2f6e718ab Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Apr 2026 10:12:56 +0800 Subject: [PATCH 1/4] Improve Statistics::try_merge_iter aggregation Aggregate easy fields first and recompute distinct_count from original per-input column stats. Utilize a new multi-way overlap helper for merging instead of pairwise-smeared NDV/min/max. Added regression coverage for three-input counterexample and permutation-based order invariance. --- datafusion/common/src/stats.rs | 244 ++++++++++++++++++++++++++++++--- 1 file changed, 228 insertions(+), 16 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 4cf5cc366158b..48d18ae7f43e9 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -713,27 +713,14 @@ impl Statistics { }) .collect(); - // Accumulate all statistics in a single pass. - // Uses precision_add for sum (reuses the lhs accumulator for - // direct numeric addition), while preserving the NDV update - // ordering required by estimate_ndv_with_overlap. + // Accumulate mergeable statistics in a single pass. + // `distinct_count` is recomputed afterward from the original + // unsmeared inputs so multi-input merges stay order-stable. for stat in items.iter().skip(1) { for (col_idx, col_stats) in column_statistics.iter_mut().enumerate() { let item_cs = &stat.column_statistics[col_idx]; col_stats.null_count = col_stats.null_count.add(&item_cs.null_count); - - // NDV must be computed before min/max update (needs pre-merge ranges) - col_stats.distinct_count = match ( - col_stats.distinct_count.get_value(), - item_cs.distinct_count.get_value(), - ) { - (Some(&l), Some(&r)) => Precision::Inexact( - estimate_ndv_with_overlap(col_stats, item_cs, l, r) - .unwrap_or_else(|| usize::max(l, r)), - ), - _ => Precision::Absent, - }; col_stats.min_value = col_stats.min_value.min(&item_cs.min_value); col_stats.max_value = col_stats.max_value.max(&item_cs.max_value); let item_sum_value = item_cs.sum_value.cast_to_sum_type(); @@ -742,6 +729,35 @@ impl Statistics { } } + for (col_idx, col_stats) in column_statistics.iter_mut().enumerate() { + let ndv_inputs: Option> = items + .iter() + .map(|stat| { + let input = &stat.column_statistics[col_idx]; + input + .distinct_count + .get_value() + .copied() + .map(|ndv| (input, ndv)) + }) + .collect(); + + col_stats.distinct_count = match ndv_inputs { + Some(inputs) => { + let fallback = inputs + .iter() + .map(|(_, ndv)| *ndv) + .max() + .expect("statistics merge requires at least one input"); + Precision::Inexact( + estimate_ndv_with_overlap_many(&inputs) + .unwrap_or(fallback), + ) + } + None => Precision::Absent, + }; + } + Ok(Statistics { num_rows, total_byte_size, @@ -840,6 +856,102 @@ pub fn estimate_ndv_with_overlap( Some((intersection + only_left + only_right).round() as usize) } +fn estimate_ndv_with_overlap_many( + inputs: &[(&ColumnStatistics, usize)], +) -> Option { + if inputs.is_empty() { + return Some(0); + } + + if inputs.len() == 1 { + return Some(inputs[0].1); + } + + struct RangedInput<'a> { + min: &'a ScalarValue, + max: &'a ScalarValue, + ndv: usize, + range: f64, + } + + let mut ranged_inputs = Vec::new(); + let mut point_inputs = Vec::new(); + let mut boundaries = Vec::new(); + + for (stats, ndv) in inputs { + let min = stats.min_value.get_value()?; + let max = stats.max_value.get_value()?; + let range = max.distance(min)? as f64; + + if range == 0.0 { + point_inputs.push((min, *ndv)); + continue; + } + + boundaries.push(min.clone()); + boundaries.push(max.clone()); + ranged_inputs.push(RangedInput { + min, + max, + ndv: *ndv, + range, + }); + } + + boundaries.sort_by(|left, right| { + left.partial_cmp(right) + .expect("statistics merge requires comparable boundary values") + }); + boundaries.dedup(); + + let mut estimate = 0.0; + for window in boundaries.windows(2) { + let segment_start = &window[0]; + let segment_end = &window[1]; + let segment_range = segment_end.distance(segment_start)? as f64; + + if segment_range == 0.0 { + continue; + } + + let segment_estimate = ranged_inputs + .iter() + .filter(|input| input.min <= segment_start && input.max >= segment_end) + .map(|input| segment_range * input.ndv as f64 / input.range) + .fold(0.0, f64::max); + estimate += segment_estimate; + } + + let mut point_values: Vec = point_inputs + .iter() + .map(|(value, _)| (*value).clone()) + .collect(); + point_values.sort_by(|left, right| { + left.partial_cmp(right) + .expect("statistics merge requires comparable point values") + }); + point_values.dedup(); + + for value in point_values { + let point_ndv = point_inputs + .iter() + .filter(|(point, _)| *point == &value) + .map(|(_, ndv)| *ndv) + .max() + .expect("point inputs were grouped from existing values"); + let covered_ndv = ranged_inputs + .iter() + .filter(|input| input.min <= &value && &value <= input.max) + .map(|input| input.ndv) + .max() + .unwrap_or(0); + + estimate += point_ndv.saturating_sub(covered_ndv) as f64; + } + + Some(estimate.round() as usize) +} + /// Creates an estimate of the number of rows in the output using the given /// optional value and exactness flag. fn check_num_rows(value: Option, is_exact: bool) -> Precision { @@ -1812,6 +1924,106 @@ mod tests { ); } + #[test] + fn test_try_merge_ndv_order_invariant_across_permutations() { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let stats_a = Statistics::default() + .with_num_rows(Precision::Exact(100)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) + .with_distinct_count(Precision::Exact(100)), + ); + let stats_b = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(40)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(60)))) + .with_distinct_count(Precision::Exact(10)), + ); + let stats_c = Statistics::default() + .with_num_rows(Precision::Exact(100)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(50)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(150)))) + .with_distinct_count(Precision::Exact(100)), + ); + + let permutations = [ + [&stats_a, &stats_b, &stats_c], + [&stats_a, &stats_c, &stats_b], + [&stats_b, &stats_a, &stats_c], + [&stats_b, &stats_c, &stats_a], + [&stats_c, &stats_a, &stats_b], + [&stats_c, &stats_b, &stats_a], + ]; + + for (idx, inputs) in permutations.into_iter().enumerate() { + let merged = Statistics::try_merge_iter(inputs, &schema).unwrap(); + assert_eq!( + merged.column_statistics[0].distinct_count, + Precision::Inexact(150), + "permutation {idx} should be order invariant", + ); + } + } + + #[test] + fn test_try_merge_ndv_nested_pairwise_merge_still_smears_inputs() { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let stats_a = Statistics::default() + .with_num_rows(Precision::Exact(100)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) + .with_distinct_count(Precision::Exact(100)), + ); + let stats_b = Statistics::default() + .with_num_rows(Precision::Exact(10)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(40)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(60)))) + .with_distinct_count(Precision::Exact(10)), + ); + let stats_c = Statistics::default() + .with_num_rows(Precision::Exact(100)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(50)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(150)))) + .with_distinct_count(Precision::Exact(100)), + ); + + let merged_direct = + Statistics::try_merge_iter([&stats_a, &stats_b, &stats_c], &schema).unwrap(); + let merged_ab = + Statistics::try_merge_iter([&stats_a, &stats_b], &schema).unwrap(); + let merged_bc = + Statistics::try_merge_iter([&stats_b, &stats_c], &schema).unwrap(); + let merged_ab_c = + Statistics::try_merge_iter([&merged_ab, &stats_c], &schema).unwrap(); + let merged_a_bc = + Statistics::try_merge_iter([&stats_a, &merged_bc], &schema).unwrap(); + + assert_eq!( + merged_direct.column_statistics[0].distinct_count, + Precision::Inexact(150) + ); + assert_eq!( + merged_ab_c.column_statistics[0].distinct_count, + Precision::Inexact(150) + ); + assert_eq!( + merged_a_bc.column_statistics[0].distinct_count, + Precision::Inexact(148) + ); + } + #[test] fn test_with_fetch_basic_preservation() { // Test that column statistics and byte size are preserved (as inexact) when applying fetch From 1a8b1b4cf0c8282295a3ba8812ace9090731d18c Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Apr 2026 10:25:11 +0800 Subject: [PATCH 2/4] Enhance NDV estimation and improve comments for clarity in Statistics module --- datafusion/common/src/stats.rs | 81 ++++++++++------------------------ 1 file changed, 24 insertions(+), 57 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 48d18ae7f43e9..dc25fec04f55d 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -715,7 +715,8 @@ impl Statistics { // Accumulate mergeable statistics in a single pass. // `distinct_count` is recomputed afterward from the original - // unsmeared inputs so multi-input merges stay order-stable. + // unsmeared inputs passed to this call so direct multi-input + // merges stay order-stable. for stat in items.iter().skip(1) { for (col_idx, col_stats) in column_statistics.iter_mut().enumerate() { let item_cs = &stat.column_statistics[col_idx]; @@ -856,9 +857,28 @@ pub fn estimate_ndv_with_overlap( Some((intersection + only_left + only_right).round() as usize) } -fn estimate_ndv_with_overlap_many( - inputs: &[(&ColumnStatistics, usize)], -) -> Option { +/// Estimates the combined number of distinct values (NDV) for multiple inputs +/// by partitioning the overall value space into non-overlapping segments. +/// +/// For each open interval between sorted min/max boundaries, this helper: +/// +/// - finds every input range that fully covers the segment, +/// - estimates each input's contribution using uniform density +/// `segment_width * ndv / full_range_width`, +/// - keeps only the maximum contribution for that segment. +/// +/// This is a multi-way analogue of [`estimate_ndv_with_overlap`], but it avoids +/// repeatedly feeding synthesized min/max/NDV values back into later merges. +/// That makes `Statistics::try_merge_iter` stable across permutations of the +/// original inputs passed to a single call. +/// +/// Constant inputs (`min == max`) are handled separately as point values. +/// If a point is already covered by one or more ranged inputs, only the +/// uncovered remainder is added by taking `max(point_ndv - covered_ndv, 0)`. +/// +/// Returns `None` when any input lacks comparable min/max values or when +/// distance is unsupported for the underlying scalar type. +fn estimate_ndv_with_overlap_many(inputs: &[(&ColumnStatistics, usize)]) -> Option { if inputs.is_empty() { return Some(0); } @@ -1971,59 +1991,6 @@ mod tests { } } - #[test] - fn test_try_merge_ndv_nested_pairwise_merge_still_smears_inputs() { - let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); - let stats_a = Statistics::default() - .with_num_rows(Precision::Exact(100)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) - .with_distinct_count(Precision::Exact(100)), - ); - let stats_b = Statistics::default() - .with_num_rows(Precision::Exact(10)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(40)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(60)))) - .with_distinct_count(Precision::Exact(10)), - ); - let stats_c = Statistics::default() - .with_num_rows(Precision::Exact(100)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(50)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(150)))) - .with_distinct_count(Precision::Exact(100)), - ); - - let merged_direct = - Statistics::try_merge_iter([&stats_a, &stats_b, &stats_c], &schema).unwrap(); - let merged_ab = - Statistics::try_merge_iter([&stats_a, &stats_b], &schema).unwrap(); - let merged_bc = - Statistics::try_merge_iter([&stats_b, &stats_c], &schema).unwrap(); - let merged_ab_c = - Statistics::try_merge_iter([&merged_ab, &stats_c], &schema).unwrap(); - let merged_a_bc = - Statistics::try_merge_iter([&stats_a, &merged_bc], &schema).unwrap(); - - assert_eq!( - merged_direct.column_statistics[0].distinct_count, - Precision::Inexact(150) - ); - assert_eq!( - merged_ab_c.column_statistics[0].distinct_count, - Precision::Inexact(150) - ); - assert_eq!( - merged_a_bc.column_statistics[0].distinct_count, - Precision::Inexact(148) - ); - } - #[test] fn test_with_fetch_basic_preservation() { // Test that column statistics and byte size are preserved (as inexact) when applying fetch From 8b1fed1db46b78ba9f1237a10a84fd6ff8f58def Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Apr 2026 10:47:38 +0800 Subject: [PATCH 3/4] Refactor NDV estimation and improve code clarity Replace the index-heavy merge loop with zip-based iteration. Simplify the NDV recomputation path using map(...).unwrap_or(...). Extract a private max_input_ndv helper for the fallback case. Deduplicate scalar sort/dedup logic in estimate_ndv_with_overlap_many. Add small private test helpers to reduce repeated one-column Int32 NDV fixture setup. --- datafusion/common/src/stats.rs | 180 ++++++++++++--------------------- 1 file changed, 64 insertions(+), 116 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index dc25fec04f55d..fc61d052684d1 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -718,9 +718,10 @@ impl Statistics { // unsmeared inputs passed to this call so direct multi-input // merges stay order-stable. for stat in items.iter().skip(1) { - for (col_idx, col_stats) in column_statistics.iter_mut().enumerate() { - let item_cs = &stat.column_statistics[col_idx]; - + for (col_stats, item_cs) in column_statistics + .iter_mut() + .zip(stat.column_statistics.iter()) + { col_stats.null_count = col_stats.null_count.add(&item_cs.null_count); col_stats.min_value = col_stats.min_value.min(&item_cs.min_value); col_stats.max_value = col_stats.max_value.max(&item_cs.max_value); @@ -743,20 +744,14 @@ impl Statistics { }) .collect(); - col_stats.distinct_count = match ndv_inputs { - Some(inputs) => { - let fallback = inputs - .iter() - .map(|(_, ndv)| *ndv) - .max() - .expect("statistics merge requires at least one input"); + col_stats.distinct_count = ndv_inputs + .map(|inputs| { Precision::Inexact( estimate_ndv_with_overlap_many(&inputs) - .unwrap_or(fallback), + .unwrap_or_else(|| max_input_ndv(&inputs)), ) - } - None => Precision::Absent, - }; + }) + .unwrap_or(Precision::Absent); } Ok(Statistics { @@ -897,6 +892,10 @@ fn estimate_ndv_with_overlap_many(inputs: &[(&ColumnStatistics, usize)]) -> Opti let mut ranged_inputs = Vec::new(); let mut point_inputs = Vec::new(); let mut boundaries = Vec::new(); + let sort_and_dedup_scalars = |values: &mut Vec, message: &str| { + values.sort_by(|left, right| left.partial_cmp(right).expect(message)); + values.dedup(); + }; for (stats, ndv) in inputs { let min = stats.min_value.get_value()?; @@ -918,11 +917,10 @@ fn estimate_ndv_with_overlap_many(inputs: &[(&ColumnStatistics, usize)]) -> Opti }); } - boundaries.sort_by(|left, right| { - left.partial_cmp(right) - .expect("statistics merge requires comparable boundary values") - }); - boundaries.dedup(); + sort_and_dedup_scalars( + &mut boundaries, + "statistics merge requires comparable boundary values", + ); let mut estimate = 0.0; for window in boundaries.windows(2) { @@ -946,11 +944,10 @@ fn estimate_ndv_with_overlap_many(inputs: &[(&ColumnStatistics, usize)]) -> Opti .iter() .map(|(value, _)| (*value).clone()) .collect(); - point_values.sort_by(|left, right| { - left.partial_cmp(right) - .expect("statistics merge requires comparable point values") - }); - point_values.dedup(); + sort_and_dedup_scalars( + &mut point_values, + "statistics merge requires comparable point values", + ); for value in point_values { let point_ndv = point_inputs @@ -972,6 +969,14 @@ fn estimate_ndv_with_overlap_many(inputs: &[(&ColumnStatistics, usize)]) -> Opti Some(estimate.round() as usize) } +fn max_input_ndv(inputs: &[(&ColumnStatistics, usize)]) -> usize { + inputs + .iter() + .map(|(_, ndv)| *ndv) + .max() + .expect("statistics merge requires at least one input") +} + /// Creates an estimate of the number of rows in the output using the given /// optional value and exactness flag. fn check_num_rows(value: Option, is_exact: bool) -> Precision { @@ -1176,6 +1181,26 @@ mod tests { use arrow::datatypes::Field; use std::sync::Arc; + fn int32_test_schema() -> Schema { + Schema::new(vec![Field::new("a", DataType::Int32, true)]) + } + + fn int32_ndv_stats( + num_rows: usize, + min: i32, + max: i32, + distinct_count: usize, + ) -> Statistics { + Statistics::default() + .with_num_rows(Precision::Exact(num_rows)) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(min)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(max)))) + .with_distinct_count(Precision::Exact(distinct_count)), + ) + } + #[test] fn test_get_value() { let exact_precision = Precision::Exact(42); @@ -1778,24 +1803,10 @@ mod tests { #[test] fn test_try_merge_ndv_identical_ranges() { - let stats1 = Statistics::default() - .with_num_rows(Precision::Exact(100)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) - .with_distinct_count(Precision::Exact(50)), - ); - let stats2 = Statistics::default() - .with_num_rows(Precision::Exact(100)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) - .with_distinct_count(Precision::Exact(30)), - ); + let stats1 = int32_ndv_stats(100, 0, 100, 50); + let stats2 = int32_ndv_stats(100, 0, 100, 30); - let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let schema = int32_test_schema(); let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap(); // Full overlap -> max(50, 30) = 50 assert_eq!( @@ -1806,24 +1817,10 @@ mod tests { #[test] fn test_try_merge_ndv_partial_overlap() { - let stats1 = Statistics::default() - .with_num_rows(Precision::Exact(100)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) - .with_distinct_count(Precision::Exact(80)), - ); - let stats2 = Statistics::default() - .with_num_rows(Precision::Exact(100)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(50)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(150)))) - .with_distinct_count(Precision::Exact(60)), - ); + let stats1 = int32_ndv_stats(100, 0, 100, 80); + let stats2 = int32_ndv_stats(100, 50, 150, 60); - let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let schema = int32_test_schema(); let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap(); // overlap=[50,100], range_left=100, range_right=100, overlap_range=50 // overlap_left=80*(50/100)=40, overlap_right=60*(50/100)=30 @@ -1895,24 +1892,10 @@ mod tests { #[test] fn test_try_merge_ndv_constant_columns() { // Same constant: [5,5]+[5,5] -> max - let stats1 = Statistics::default() - .with_num_rows(Precision::Exact(10)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(5)))) - .with_distinct_count(Precision::Exact(1)), - ); - let stats2 = Statistics::default() - .with_num_rows(Precision::Exact(10)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(5)))) - .with_distinct_count(Precision::Exact(1)), - ); + let stats1 = int32_ndv_stats(10, 5, 5, 1); + let stats2 = int32_ndv_stats(10, 5, 5, 1); - let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + let schema = int32_test_schema(); let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap(); assert_eq!( merged.column_statistics[0].distinct_count, @@ -1920,22 +1903,8 @@ mod tests { ); // Different constants: [5,5]+[10,10] -> sum - let stats3 = Statistics::default() - .with_num_rows(Precision::Exact(10)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(5)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(5)))) - .with_distinct_count(Precision::Exact(1)), - ); - let stats4 = Statistics::default() - .with_num_rows(Precision::Exact(10)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(10)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(10)))) - .with_distinct_count(Precision::Exact(1)), - ); + let stats3 = int32_ndv_stats(10, 5, 5, 1); + let stats4 = int32_ndv_stats(10, 10, 10, 1); let merged = Statistics::try_merge_iter([&stats3, &stats4], &schema).unwrap(); assert_eq!( @@ -1946,31 +1915,10 @@ mod tests { #[test] fn test_try_merge_ndv_order_invariant_across_permutations() { - let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); - let stats_a = Statistics::default() - .with_num_rows(Precision::Exact(100)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) - .with_distinct_count(Precision::Exact(100)), - ); - let stats_b = Statistics::default() - .with_num_rows(Precision::Exact(10)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(40)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(60)))) - .with_distinct_count(Precision::Exact(10)), - ); - let stats_c = Statistics::default() - .with_num_rows(Precision::Exact(100)) - .add_column_statistics( - ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(50)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(150)))) - .with_distinct_count(Precision::Exact(100)), - ); + let schema = int32_test_schema(); + let stats_a = int32_ndv_stats(100, 0, 100, 100); + let stats_b = int32_ndv_stats(10, 40, 60, 10); + let stats_c = int32_ndv_stats(100, 50, 150, 100); let permutations = [ [&stats_a, &stats_b, &stats_c], From 7f05067c2b798195e01932ddaa8e4d480472129d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 2 Apr 2026 11:19:46 +0800 Subject: [PATCH 4/4] feat(stats): format function parameters for better readability in estimate_ndv_with_overlap_many function --- datafusion/common/src/stats.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index fc61d052684d1..15ae7f991c0ac 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -873,7 +873,9 @@ pub fn estimate_ndv_with_overlap( /// /// Returns `None` when any input lacks comparable min/max values or when /// distance is unsupported for the underlying scalar type. -fn estimate_ndv_with_overlap_many(inputs: &[(&ColumnStatistics, usize)]) -> Option { +fn estimate_ndv_with_overlap_many( + inputs: &[(&ColumnStatistics, usize)], +) -> Option { if inputs.is_empty() { return Some(0); }