From f695077b964efd4f55326761801af36f849decfb Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Tue, 5 May 2026 13:29:46 +0800 Subject: [PATCH] Display partition stats in manifests system table Signed-off-by: QuakeWang <1677980708@qq.com> --- crates/integrations/datafusion/Cargo.toml | 1 + .../datafusion/src/system_tables/manifests.rs | 162 +++++- .../datafusion/src/system_tables/mod.rs | 1 + .../src/system_tables/row_string_cast.rs | 517 ++++++++++++++++++ .../datafusion/tests/system_tables.rs | 77 +++ 5 files changed, 754 insertions(+), 4 deletions(-) create mode 100644 crates/integrations/datafusion/src/system_tables/row_string_cast.rs diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 832326e1..f5472d2d 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -41,6 +41,7 @@ futures = "0.3" serde = { version = "1", features = ["derive"] } serde_json = "1" tokio = { workspace = true, features = ["rt", "time", "fs"] } +lexical-write-float = "1.0.6" [dev-dependencies] arrow-array = { workspace = true } diff --git a/crates/integrations/datafusion/src/system_tables/manifests.rs b/crates/integrations/datafusion/src/system_tables/manifests.rs index bc808051..28f288a9 100644 --- a/crates/integrations/datafusion/src/system_tables/manifests.rs +++ b/crates/integrations/datafusion/src/system_tables/manifests.rs @@ -21,7 +21,7 @@ use std::any::Any; use std::sync::{Arc, OnceLock}; use async_trait::async_trait; -use datafusion::arrow::array::{new_null_array, Int64Array, RecordBatch, StringArray}; +use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::catalog::Session; use datafusion::datasource::memory::MemorySourceConfig; @@ -29,11 +29,15 @@ use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result as DFResult; use datafusion::logical_expr::Expr; use datafusion::physical_plan::ExecutionPlan; -use paimon::spec::{ManifestFileMeta, ManifestList}; +use paimon::spec::{BinaryRow, DataField, ManifestFileMeta, ManifestList}; use paimon::table::{SnapshotManager, Table}; +use super::row_string_cast::format_row_as_java_cast_string; use crate::error::to_datafusion_error; +const MIN_PARTITION_STATS_INDEX: usize = 5; +const MAX_PARTITION_STATS_INDEX: usize = 6; + pub(super) fn build(table: Table) -> DFResult> { Ok(Arc::new(ManifestsTable { table })) } @@ -93,15 +97,42 @@ impl TableProvider for ManifestsTable { let mut num_added = Vec::with_capacity(n); let mut num_deleted = Vec::with_capacity(n); let mut schema_ids = Vec::with_capacity(n); + let mut min_partition_stats: Vec> = Vec::with_capacity(n); + let mut max_partition_stats: Vec> = Vec::with_capacity(n); let mut min_row_ids: Vec> = Vec::with_capacity(n); let mut max_row_ids: Vec> = Vec::with_capacity(n); + let partition_fields = self.table.schema().partition_fields(); + let projected_columns = projection.map(Vec::as_slice); + let materialize_min_partition_stats = + should_materialize_column(projected_columns, MIN_PARTITION_STATS_INDEX); + let materialize_max_partition_stats = + should_materialize_column(projected_columns, MAX_PARTITION_STATS_INDEX); for meta in metas { + let stats = meta.partition_stats(); file_names.push(meta.file_name().to_string()); file_sizes.push(meta.file_size()); num_added.push(meta.num_added_files()); num_deleted.push(meta.num_deleted_files()); schema_ids.push(meta.schema_id()); + min_partition_stats.push( + materialize_partition_stats_value( + materialize_min_partition_stats, + stats.min_values(), + stats.null_counts(), + &partition_fields, + ) + .map_err(to_datafusion_error)?, + ); + max_partition_stats.push( + materialize_partition_stats_value( + materialize_max_partition_stats, + stats.max_values(), + stats.null_counts(), + &partition_fields, + ) + .map_err(to_datafusion_error)?, + ); min_row_ids.push(meta.min_row_id()); max_row_ids.push(meta.max_row_id()); } @@ -115,8 +146,8 @@ impl TableProvider for ManifestsTable { Arc::new(Int64Array::from(num_added)), Arc::new(Int64Array::from(num_deleted)), Arc::new(Int64Array::from(schema_ids)), - new_null_array(&DataType::Utf8, n), - new_null_array(&DataType::Utf8, n), + Arc::new(StringArray::from(min_partition_stats)), + Arc::new(StringArray::from(max_partition_stats)), Arc::new(Int64Array::from(min_row_ids)), Arc::new(Int64Array::from(max_row_ids)), ], @@ -157,3 +188,126 @@ async fn collect_manifests(table: &Table) -> paimon::Result, column_index: usize) -> bool { + match projection { + Some(projection) => projection.contains(&column_index), + None => true, + } +} + +fn materialize_partition_stats_value( + materialize: bool, + value_bytes: &[u8], + null_counts: &[Option], + partition_fields: &[DataField], +) -> paimon::Result> { + if materialize { + format_partition_stats_value(value_bytes, null_counts, partition_fields) + } else { + Ok(None) + } +} + +fn format_partition_stats_value( + value_bytes: &[u8], + null_counts: &[Option], + partition_fields: &[DataField], +) -> paimon::Result> { + if value_bytes.is_empty() { + return if partition_fields.is_empty() || null_counts.len() == partition_fields.len() { + Ok(Some(format_all_null_partition_row(partition_fields.len()))) + } else { + Ok(None) + }; + } + + let row = BinaryRow::from_serialized_bytes(value_bytes)?; + format_row_as_java_cast_string(&row, partition_fields).map(Some) +} + +fn format_all_null_partition_row(arity: usize) -> String { + if arity == 0 { + return "{}".to_string(); + } + format!("{{{}}}", vec!["null"; arity].join(", ")) +} + +#[cfg(test)] +mod tests { + use super::*; + use paimon::spec::{DataType as PaimonDataType, Datum, FloatType, IntType, VarCharType}; + + fn field(name: &str, data_type: PaimonDataType) -> DataField { + DataField::new(0, name.to_string(), data_type) + } + + fn serialized_row(values: &[(Option, PaimonDataType)]) -> Vec { + let refs: Vec<_> = values + .iter() + .map(|(datum, data_type)| (datum.as_ref(), data_type)) + .collect(); + BinaryRow::from_datums(&refs).to_serialized_bytes() + } + + #[test] + fn test_should_materialize_column() { + let projected_stats = vec![MIN_PARTITION_STATS_INDEX]; + let projected_without_stats = vec![0, 1, 2]; + + assert!(should_materialize_column(None, MIN_PARTITION_STATS_INDEX)); + assert!(should_materialize_column( + Some(projected_stats.as_slice()), + MIN_PARTITION_STATS_INDEX + )); + assert!(!should_materialize_column( + Some(projected_without_stats.as_slice()), + MIN_PARTITION_STATS_INDEX + )); + } + + #[test] + fn test_unprojected_partition_stats_are_not_formatted() { + let data_type = PaimonDataType::Float(FloatType::new()); + let fields = vec![field("pt", data_type.clone())]; + let bytes = serialized_row(&[(Some(Datum::Float(1.0)), data_type.clone())]); + + assert_eq!( + materialize_partition_stats_value(false, &bytes, &[Some(0)], &fields).unwrap(), + None + ); + assert_eq!( + materialize_partition_stats_value(true, &bytes, &[Some(0)], &fields).unwrap(), + Some("{1.0}".to_string()) + ); + } + + #[test] + fn test_format_empty_partition_row() { + assert_eq!( + format_partition_stats_value(&[], &[], &[]).unwrap(), + Some("{}".to_string()) + ); + } + + #[test] + fn test_format_empty_bytes_with_matching_null_counts_as_all_null() { + let fields = vec![ + field("pt1", PaimonDataType::Int(IntType::new())), + field("pt2", PaimonDataType::VarChar(VarCharType::string_type())), + ]; + assert_eq!( + format_partition_stats_value(&[], &[Some(2), Some(2)], &fields).unwrap(), + Some("{null, null}".to_string()) + ); + } + + #[test] + fn test_format_empty_bytes_with_mismatched_null_counts_as_unknown() { + let fields = vec![field("pt", PaimonDataType::Int(IntType::new()))]; + assert_eq!( + format_partition_stats_value(&[], &[], &fields).unwrap(), + None + ); + } +} diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs b/crates/integrations/datafusion/src/system_tables/mod.rs index 1bdc0d13..42336670 100644 --- a/crates/integrations/datafusion/src/system_tables/mod.rs +++ b/crates/integrations/datafusion/src/system_tables/mod.rs @@ -32,6 +32,7 @@ use crate::error::to_datafusion_error; mod branches; mod manifests; mod options; +mod row_string_cast; mod schemas; mod snapshots; mod tags; diff --git a/crates/integrations/datafusion/src/system_tables/row_string_cast.rs b/crates/integrations/datafusion/src/system_tables/row_string_cast.rs new file mode 100644 index 00000000..1501d1cf --- /dev/null +++ b/crates/integrations/datafusion/src/system_tables/row_string_cast.rs @@ -0,0 +1,517 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::num::NonZeroI32; + +use chrono::{Local, NaiveDate, NaiveDateTime, TimeZone, Timelike}; +use lexical_write_float::{format::STANDARD, Options, ToLexicalWithOptions}; +use paimon::spec::{BinaryRow, DataField, DataType, Datum}; +use paimon::{Error, Result}; + +const MILLIS_PER_DAY: i64 = 86_400_000; +const JAVA_FLOAT_OPTIONS: Options = Options::builder() + .positive_exponent_break(NonZeroI32::new(6)) + .negative_exponent_break(NonZeroI32::new(-3)) + .exponent(b'E') + .inf_string(Some(b"Infinity")) + .build_strict(); + +pub(super) fn format_row_as_java_cast_string( + row: &BinaryRow, + fields: &[DataField], +) -> Result { + validate_row(row, fields)?; + + let mut out = String::from("{"); + for (pos, field) in fields.iter().enumerate() { + if pos > 0 { + out.push_str(", "); + } + out.push_str(&format_field(row, pos, field.data_type())?); + } + out.push('}'); + Ok(out) +} + +fn validate_row(row: &BinaryRow, fields: &[DataField]) -> Result<()> { + if row.arity() < 0 { + return Err(data_invalid(format!( + "Row string cast row has negative arity {}", + row.arity() + ))); + } + + let arity = row.arity() as usize; + if arity != fields.len() { + return Err(data_invalid(format!( + "Row string cast row arity {arity} does not match field count {}", + fields.len() + ))); + } + + let min_size = BinaryRow::cal_fix_part_size_in_bytes(row.arity()) as usize; + if row.data().len() < min_size { + return Err(data_invalid(format!( + "Row string cast row data too short: need at least {min_size} bytes, got {}", + row.data().len() + ))); + } + + Ok(()) +} + +fn format_field(row: &BinaryRow, pos: usize, data_type: &DataType) -> Result { + let Some(datum) = row.get_datum(pos, data_type)? else { + return Ok("null".to_string()); + }; + + match (datum, data_type) { + (Datum::Bool(v), DataType::Boolean(_)) => Ok(v.to_string()), + (Datum::TinyInt(v), DataType::TinyInt(_)) => Ok(v.to_string()), + (Datum::SmallInt(v), DataType::SmallInt(_)) => Ok(v.to_string()), + (Datum::Int(v), DataType::Int(_)) => Ok(v.to_string()), + (Datum::Long(v), DataType::BigInt(_)) => Ok(v.to_string()), + (Datum::Float(v), DataType::Float(_)) => Ok(format_float(v)), + (Datum::Double(v), DataType::Double(_)) => Ok(format_double(v)), + (Datum::String(v), DataType::Char(_) | DataType::VarChar(_)) => Ok(v), + (Datum::Bytes(v), DataType::Binary(_) | DataType::VarBinary(_)) => { + Ok(String::from_utf8_lossy(&v).into_owned()) + } + (Datum::Date(v), DataType::Date(_)) => format_date(v), + (Datum::Time(v), DataType::Time(t)) => format_time(v, t.precision()), + ( + Datum::Decimal { + unscaled, scale, .. + }, + DataType::Decimal(_), + ) => Ok(format_decimal_plain(unscaled, scale)), + (Datum::Timestamp { millis, nanos }, DataType::Timestamp(t)) => { + format_timestamp(millis, nanos, t.precision()) + } + (Datum::LocalZonedTimestamp { millis, nanos }, DataType::LocalZonedTimestamp(t)) => { + format_local_zoned_timestamp(millis, nanos, t.precision()) + } + (datum, _) => Err(data_invalid(format!( + "Decoded row string cast datum {datum:?} does not match type {data_type:?}" + ))), + } +} + +fn format_float(value: f32) -> String { + const BUFFER_SIZE: usize = JAVA_FLOAT_OPTIONS.buffer_size_const::(); + let mut buffer = [0u8; BUFFER_SIZE]; + let bytes = value.to_lexical_with_options::(&mut buffer, &JAVA_FLOAT_OPTIONS); + std::str::from_utf8(bytes) + .expect("lexical float output is valid UTF-8") + .to_string() +} + +fn format_double(value: f64) -> String { + const BUFFER_SIZE: usize = JAVA_FLOAT_OPTIONS.buffer_size_const::(); + let mut buffer = [0u8; BUFFER_SIZE]; + let bytes = value.to_lexical_with_options::(&mut buffer, &JAVA_FLOAT_OPTIONS); + std::str::from_utf8(bytes) + .expect("lexical double output is valid UTF-8") + .to_string() +} + +fn format_date(epoch_days: i32) -> Result { + let ce_days = epoch_days.checked_add(719_163).ok_or_else(|| { + data_invalid(format!( + "Date row string cast value {epoch_days} is outside supported range" + )) + })?; + let date = NaiveDate::from_num_days_from_ce_opt(ce_days).ok_or_else(|| { + data_invalid(format!( + "Date row string cast value {epoch_days} is outside supported range" + )) + })?; + Ok(date.format("%Y-%m-%d").to_string()) +} + +fn format_time(millis_of_day: i32, precision: u32) -> Result { + let mut millis = millis_of_day as i64; + while millis < 0 { + millis += MILLIS_PER_DAY; + } + + let h = millis / 3_600_000; + let m = (millis % 3_600_000) / 60_000; + let s = (millis % 60_000) / 1_000; + let mut ms = millis % 1_000; + let mut out = format!("{h:02}:{m:02}:{s:02}"); + + if precision > 0 { + out.push('.'); + let mut remaining = precision; + while remaining > 0 { + out.push((b'0' + (ms / 100) as u8) as char); + ms = (ms % 100) * 10; + if ms == 0 { + break; + } + remaining -= 1; + } + } + + Ok(out) +} + +fn format_decimal_plain(unscaled: i128, scale: u32) -> String { + if scale == 0 { + return unscaled.to_string(); + } + + let negative = unscaled < 0; + let abs = if unscaled == i128::MIN { + (i128::MAX as u128) + 1 + } else { + unscaled.unsigned_abs() + }; + + let digits = abs.to_string(); + let scale = scale as usize; + let result = if digits.len() <= scale { + let mut s = String::with_capacity(scale + 2); + s.push_str("0."); + for _ in 0..(scale - digits.len()) { + s.push('0'); + } + s.push_str(&digits); + s + } else { + let int_len = digits.len() - scale; + let mut s = String::with_capacity(digits.len() + 1); + s.push_str(&digits[..int_len]); + s.push('.'); + s.push_str(&digits[int_len..]); + s + }; + + if negative { + format!("-{result}") + } else { + result + } +} + +fn format_timestamp(millis: i64, nano_of_milli: i32, precision: u32) -> Result { + format_timestamp_naive(millis_to_naive_datetime(millis, nano_of_milli)?, precision) +} + +fn format_local_zoned_timestamp(millis: i64, nano_of_milli: i32, precision: u32) -> Result { + let nanos = timestamp_nanos(millis, nano_of_milli)?; + let secs = millis.div_euclid(1000); + let local = Local + .timestamp_opt(secs, nanos) + .single() + .ok_or_else(|| data_invalid(format!("Invalid local zoned timestamp millis {millis}")))?; + format_timestamp_naive(local.naive_local(), precision) +} + +fn format_timestamp_naive(dt: NaiveDateTime, precision: u32) -> Result { + let precision = usize::try_from(precision).map_err(|e| Error::DataInvalid { + message: format!("Timestamp row string cast precision {precision} is invalid"), + source: Some(Box::new(e)), + })?; + if precision > 9 { + return Err(data_invalid(format!( + "Timestamp row string cast precision {precision} is outside 0..=9" + ))); + } + + let mut out = dt.format("%Y-%m-%d %H:%M:%S").to_string(); + if precision > 0 { + let fraction = format!("{:09}", dt.nanosecond()); + out.push('.'); + out.push_str(&fraction[..precision]); + } + Ok(out) +} + +fn millis_to_naive_datetime(millis: i64, nano_of_milli: i32) -> Result { + let nanos = timestamp_nanos(millis, nano_of_milli)?; + let days = millis.div_euclid(MILLIS_PER_DAY); + let millis_of_day = millis.rem_euclid(MILLIS_PER_DAY) as u64; + let nano_of_day = millis_of_day * 1_000_000 + u64::from(nanos % 1_000_000); + let ce_days = days.checked_add(719_163).ok_or_else(|| { + data_invalid(format!( + "Timestamp row string cast millis {millis} is outside supported range" + )) + })?; + let ce_days = i32::try_from(ce_days).map_err(|e| Error::DataInvalid { + message: format!("Timestamp row string cast millis {millis} is outside supported range"), + source: Some(Box::new(e)), + })?; + let date = NaiveDate::from_num_days_from_ce_opt(ce_days).ok_or_else(|| { + data_invalid(format!( + "Timestamp row string cast millis {millis} is outside supported range" + )) + })?; + let time = chrono::NaiveTime::from_num_seconds_from_midnight_opt( + (nano_of_day / 1_000_000_000) as u32, + (nano_of_day % 1_000_000_000) as u32, + ) + .ok_or_else(|| data_invalid(format!("Invalid timestamp millis {millis}")))?; + Ok(NaiveDateTime::new(date, time)) +} + +fn timestamp_nanos(millis: i64, nano_of_milli: i32) -> Result { + if !(0..=999_999).contains(&nano_of_milli) { + return Err(data_invalid(format!( + "Timestamp nano-of-millisecond {nano_of_milli} is outside 0..=999999" + ))); + } + Ok(millis.rem_euclid(1000) as u32 * 1_000_000 + nano_of_milli as u32) +} + +fn data_invalid(message: String) -> Error { + Error::DataInvalid { + message, + source: None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use paimon::spec::{ + BigIntType, BinaryType, BlobType, BooleanType, CharType, DateType, DecimalType, DoubleType, + FloatType, IntType, LocalZonedTimestampType, SmallIntType, TimeType, TimestampType, + TinyIntType, VarBinaryType, VarCharType, + }; + + fn field(id: i32, data_type: DataType) -> DataField { + DataField::new(id, format!("f{id}"), data_type) + } + + fn row(values: &[(Option, DataType)]) -> BinaryRow { + let refs: Vec<_> = values + .iter() + .map(|(datum, data_type)| (datum.as_ref(), data_type)) + .collect(); + BinaryRow::from_datums(&refs) + } + + fn format_value(values: &[(Option, DataType)]) -> Result { + let fields: Vec<_> = values + .iter() + .enumerate() + .map(|(i, (_, data_type))| field(i as i32, data_type.clone())) + .collect(); + format_row_as_java_cast_string(&row(values), &fields) + } + + #[test] + fn test_format_supported_scalar_types() { + let values = vec![ + ( + Some(Datum::Bool(true)), + DataType::Boolean(BooleanType::new()), + ), + ( + Some(Datum::TinyInt(-1)), + DataType::TinyInt(TinyIntType::new()), + ), + ( + Some(Datum::SmallInt(2)), + DataType::SmallInt(SmallIntType::new()), + ), + (Some(Datum::Int(3)), DataType::Int(IntType::new())), + (Some(Datum::Long(4)), DataType::BigInt(BigIntType::new())), + (Some(Datum::Float(1.0)), DataType::Float(FloatType::new())), + ( + Some(Datum::Double(10_000_000.0)), + DataType::Double(DoubleType::new()), + ), + ( + Some(Datum::String("c".to_string())), + DataType::Char(CharType::new(1).unwrap()), + ), + ( + Some(Datum::Bytes(b"xy".to_vec())), + DataType::Binary(BinaryType::new(2).unwrap()), + ), + ( + Some(Datum::Bytes(b"abc".to_vec())), + DataType::VarBinary(VarBinaryType::new(3).unwrap()), + ), + (Some(Datum::Date(19_723)), DataType::Date(DateType::new())), + ( + Some(Datum::Time(45_296_000)), + DataType::Time(TimeType::new(3).unwrap()), + ), + ( + Some(Datum::Decimal { + unscaled: -100, + precision: 10, + scale: 3, + }), + DataType::Decimal(DecimalType::new(10, 3).unwrap()), + ), + ( + Some(Datum::Timestamp { + millis: 1_704_110_400_123, + nanos: 456_000, + }), + DataType::Timestamp(TimestampType::new(6).unwrap()), + ), + ]; + + assert_eq!( + format_value(&values).unwrap(), + "{true, -1, 2, 3, 4, 1.0, 1.0E7, c, xy, abc, 2024-01-01, 12:34:56.0, -0.100, 2024-01-01 12:00:00.123456}" + ); + } + + #[test] + fn test_format_float_double_uses_java_display_thresholds() { + let values = vec![ + ( + Some(Datum::Double(9_999_999.0)), + DataType::Double(DoubleType::new()), + ), + ( + Some(Datum::Double(10_000_000.0)), + DataType::Double(DoubleType::new()), + ), + ( + Some(Datum::Double(0.001)), + DataType::Double(DoubleType::new()), + ), + ( + Some(Datum::Double(0.000_999_999)), + DataType::Double(DoubleType::new()), + ), + (Some(Datum::Float(-0.0)), DataType::Float(FloatType::new())), + ( + Some(Datum::Double(f64::INFINITY)), + DataType::Double(DoubleType::new()), + ), + ( + Some(Datum::Double(f64::NEG_INFINITY)), + DataType::Double(DoubleType::new()), + ), + ( + Some(Datum::Double(f64::NAN)), + DataType::Double(DoubleType::new()), + ), + ]; + + assert_eq!( + format_value(&values).unwrap(), + "{9999999.0, 1.0E7, 0.001, 9.99999E-4, -0.0, Infinity, -Infinity, NaN}" + ); + } + + #[test] + fn test_format_binary_invalid_utf8_uses_lossy_string() { + let bytes = vec![0xff]; + let data_type = DataType::VarBinary(VarBinaryType::new(1).unwrap()); + let expected = String::from_utf8_lossy(&bytes).into_owned(); + + assert_eq!( + format_value(&[(Some(Datum::Bytes(bytes)), data_type)]).unwrap(), + format!("{{{expected}}}") + ); + } + + #[test] + fn test_format_null_values() { + let values = vec![ + (None, DataType::Int(IntType::new())), + (None, DataType::VarChar(VarCharType::string_type())), + ]; + + assert_eq!(format_value(&values).unwrap(), "{null, null}"); + } + + #[test] + fn test_format_timestamp_precision_matches_java_cast() { + let values = vec![ + ( + Some(Datum::Timestamp { + millis: 1_704_110_400_123, + nanos: 456_000, + }), + DataType::Timestamp(TimestampType::new(3).unwrap()), + ), + ( + Some(Datum::Timestamp { + millis: 1_704_110_400_123, + nanos: 456_000, + }), + DataType::Timestamp(TimestampType::new(6).unwrap()), + ), + ]; + + assert_eq!( + format_value(&values).unwrap(), + "{2024-01-01 12:00:00.123, 2024-01-01 12:00:00.123456}" + ); + } + + #[test] + fn test_format_local_zoned_timestamp() { + let data_type = DataType::LocalZonedTimestamp(LocalZonedTimestampType::new(3).unwrap()); + let millis = 1_704_067_200_000; + let expected = Local + .timestamp_opt(millis / 1000, 0) + .single() + .map(|dt| format!("{}.000", dt.format("%Y-%m-%d %H:%M:%S"))) + .unwrap(); + + assert_eq!( + format_value(&[( + Some(Datum::LocalZonedTimestamp { millis, nanos: 0 }), + data_type, + )]) + .unwrap(), + format!("{{{expected}}}") + ); + } + + #[test] + fn test_format_unsupported_type_returns_error() { + let data_type = DataType::Blob(BlobType::new()); + let err = format_value(&[(Some(Datum::Bytes(b"x".to_vec())), data_type)]) + .expect_err("blob row string cast should be unsupported"); + assert!(matches!(err, Error::Unsupported { .. })); + } + + #[test] + fn test_format_arity_mismatch_returns_error() { + let int_type = DataType::Int(IntType::new()); + let row = row(&[(Some(Datum::Int(1)), int_type.clone())]); + let fields = vec![ + field(0, int_type.clone()), + field(1, DataType::Int(IntType::new())), + ]; + let err = + format_row_as_java_cast_string(&row, &fields).expect_err("arity mismatch should fail"); + assert!(err.to_string().contains("arity")); + } + + #[test] + fn test_format_truncated_row_returns_error() { + let row = BinaryRow::from_bytes(1, vec![0, 0, 0, 0]); + let fields = vec![field(0, DataType::Int(IntType::new()))]; + let err = + format_row_as_java_cast_string(&row, &fields).expect_err("truncated row should fail"); + assert!(err.to_string().contains("too short")); + } +} diff --git a/crates/integrations/datafusion/tests/system_tables.rs b/crates/integrations/datafusion/tests/system_tables.rs index 0d0afa40..a3f386e0 100644 --- a/crates/integrations/datafusion/tests/system_tables.rs +++ b/crates/integrations/datafusion/tests/system_tables.rs @@ -17,6 +17,8 @@ //! Paimon system tables end-to-end via DataFusion SQL. +mod common; + use std::sync::Arc; use datafusion::arrow::array::{Array, Int64Array, StringArray}; @@ -547,3 +549,78 @@ async fn test_manifests_system_table() { "$manifests rows should match base + delta + changelog manifest entries of the latest snapshot" ); } + +#[tokio::test] +async fn test_manifests_system_table_partition_stats() { + let (_tmp, handler) = common::setup_handler().await; + common::exec( + &handler, + "CREATE TABLE paimon.test_db.manifest_stats (id INT, pt INT) PARTITIONED BY (pt)", + ) + .await; + common::exec( + &handler, + "INSERT INTO paimon.test_db.manifest_stats VALUES (1, 1), (2, 2)", + ) + .await; + + let batches = handler + .sql( + "SELECT min_partition_stats, max_partition_stats \ + FROM paimon.test_db.manifest_stats$manifests", + ) + .await + .expect("$manifests query should plan") + .collect() + .await + .expect("$manifests query should execute"); + + let mut stats = Vec::new(); + for batch in &batches { + let mins = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("min_partition_stats is Utf8"); + let maxs = batch + .column(1) + .as_any() + .downcast_ref::() + .expect("max_partition_stats is Utf8"); + for row in 0..batch.num_rows() { + stats.push(( + (!mins.is_null(row)).then(|| mins.value(row).to_string()), + (!maxs.is_null(row)).then(|| maxs.value(row).to_string()), + )); + } + } + stats.sort(); + + assert!( + !stats.is_empty(), + "$manifests should return partition stats" + ); + + let min_partition = stats + .iter() + .filter_map(|(min, _)| min.as_deref()) + .map(single_int_partition_stat) + .min(); + let max_partition = stats + .iter() + .filter_map(|(_, max)| max.as_deref()) + .map(single_int_partition_stat) + .max(); + + assert_eq!(min_partition, Some(1)); + assert_eq!(max_partition, Some(2)); +} + +fn single_int_partition_stat(value: &str) -> i32 { + value + .strip_prefix('{') + .and_then(|s| s.strip_suffix('}')) + .expect("partition stats should use row cast braces") + .parse() + .expect("partition stats should contain one int partition value") +}