From b0f7d9f765ce9393341f61a2331f6f67da495f96 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 18 May 2026 09:52:14 -0600 Subject: [PATCH 1/2] feat(spark): port Spark-compatible Parquet schema adapter from Comet Adds a `parquet` feature to `datafusion-spark` exposing a `SparkPhysicalExprAdapterFactory` that mirrors Apache Spark's vectorized Parquet reader semantics. Faithful port of `apache/datafusion-comet`'s schema adapter on apache/main. Closes #22339 Co-Authored-By: Claude Opus 4.7 --- Cargo.lock | 8 + datafusion/spark/Cargo.toml | 22 +- datafusion/spark/src/lib.rs | 2 + datafusion/spark/src/parquet/cast_column.rs | 520 +++++ datafusion/spark/src/parquet/error.rs | 114 ++ datafusion/spark/src/parquet/mod.rs | 71 + datafusion/spark/src/parquet/options.rs | 125 ++ .../spark/src/parquet/parquet_support.rs | 359 ++++ .../spark/src/parquet/schema_adapter.rs | 1728 +++++++++++++++++ 9 files changed, 2947 insertions(+), 2 deletions(-) create mode 100644 datafusion/spark/src/parquet/cast_column.rs create mode 100644 datafusion/spark/src/parquet/error.rs create mode 100644 datafusion/spark/src/parquet/mod.rs create mode 100644 datafusion/spark/src/parquet/options.rs create mode 100644 datafusion/spark/src/parquet/parquet_support.rs create mode 100644 datafusion/spark/src/parquet/schema_adapter.rs diff --git a/Cargo.lock b/Cargo.lock index c18fd2012891c..d2e9b35e5c4bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2541,18 +2541,26 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-expr-common", "datafusion-functions", "datafusion-functions-aggregate", "datafusion-functions-aggregate-common", "datafusion-functions-nested", + "datafusion-physical-expr", + "datafusion-physical-expr-adapter", + "datafusion-physical-expr-common", + "futures", "log", "num-traits", + "parquet", "percent-encoding", "rand 0.9.4", "serde_json", "sha1 0.11.0", "sha2", + "tokio", "url", + "uuid", ] [[package]] diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index 72bca58b7a2cf..8956579075d70 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -32,6 +32,15 @@ all-features = true [features] default = [] core = ["datafusion"] +# Spark-compatible Parquet schema adapter (see `parquet` module). +parquet = [ + "dep:datafusion-physical-expr", + "dep:datafusion-physical-expr-adapter", + "dep:datafusion-physical-expr-common", + "dep:datafusion-expr-common", + "dep:parquet", + "dep:uuid", +] # Note: add additional linter rules in lib.rs. # Rust does not support workspace + new linter rules in subcrates yet @@ -53,24 +62,33 @@ datafusion-catalog = { workspace = true } datafusion-common = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-expr-common = { workspace = true, optional = true } datafusion-functions = { workspace = true, features = ["crypto_expressions"] } datafusion-functions-aggregate = { workspace = true } datafusion-functions-aggregate-common = { workspace = true } datafusion-functions-nested = { workspace = true } +datafusion-physical-expr = { workspace = true, optional = true } +datafusion-physical-expr-adapter = { workspace = true, optional = true } +datafusion-physical-expr-common = { workspace = true, optional = true } log = { workspace = true } num-traits = { workspace = true } +parquet = { workspace = true, optional = true } percent-encoding = "2.3.2" rand = { workspace = true } serde_json = { workspace = true } sha1 = "0.11" sha2 = { workspace = true } url = { workspace = true } +uuid = { workspace = true, optional = true } [dev-dependencies] arrow = { workspace = true, features = ["test_utils"] } criterion = { workspace = true } -# for SessionStateBuilderSpark tests -datafusion = { workspace = true, default-features = false } +# for SessionStateBuilderSpark tests and Parquet schema-adapter tests +datafusion = { workspace = true, default-features = false, features = ["parquet"] } +futures = { workspace = true } +rand = { workspace = true } +tokio = { workspace = true } [[bench]] harness = false diff --git a/datafusion/spark/src/lib.rs b/datafusion/spark/src/lib.rs index 2eee94c52ef78..8154bb79f2abb 100644 --- a/datafusion/spark/src/lib.rs +++ b/datafusion/spark/src/lib.rs @@ -129,6 +129,8 @@ //! for an example. pub mod function; +#[cfg(feature = "parquet")] +pub mod parquet; pub mod planner; #[cfg(feature = "core")] diff --git a/datafusion/spark/src/parquet/cast_column.rs b/datafusion/spark/src/parquet/cast_column.rs new file mode 100644 index 0000000000000..eb11deb63e82c --- /dev/null +++ b/datafusion/spark/src/parquet/cast_column.rs @@ -0,0 +1,520 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`SparkCastColumnExpr`] adapts a single column read from a Parquet file to +//! the type expected by the query, applying Spark-compatible conversion logic +//! for nested types and a handful of primitive cases that Arrow's cast does +//! not get right for Spark. + +use std::fmt::{self, Display}; +use std::hash::Hash; +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayRef, LargeListArray, ListArray, MapArray, StructArray, + TimestampMicrosecondArray, TimestampMillisecondArray, make_array, +}; +use arrow::compute::CastOptions; +use arrow::datatypes::{DataType, FieldRef, Schema, TimeUnit}; +use arrow::record_batch::RecordBatch; +use datafusion_common::format::DEFAULT_CAST_OPTIONS; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr_common::columnar_value::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + +use super::options::SparkParquetOptions; +use super::parquet_support::spark_parquet_convert; + +/// True if two data types are structurally equivalent (same buffer layout) +/// but may differ in the *names* of nested fields. Used to detect cases +/// where we only need to relabel an array's metadata rather than convert it. +fn types_differ_only_in_field_names(physical: &DataType, logical: &DataType) -> bool { + match (physical, logical) { + (DataType::List(pf), DataType::List(lf)) => { + pf.is_nullable() == lf.is_nullable() + && (pf.data_type() == lf.data_type() + || types_differ_only_in_field_names(pf.data_type(), lf.data_type())) + } + (DataType::LargeList(pf), DataType::LargeList(lf)) => { + pf.is_nullable() == lf.is_nullable() + && (pf.data_type() == lf.data_type() + || types_differ_only_in_field_names(pf.data_type(), lf.data_type())) + } + (DataType::Map(pf, p_sorted), DataType::Map(lf, l_sorted)) => { + p_sorted == l_sorted + && pf.is_nullable() == lf.is_nullable() + && (pf.data_type() == lf.data_type() + || types_differ_only_in_field_names(pf.data_type(), lf.data_type())) + } + (DataType::Struct(pfields), DataType::Struct(lfields)) => { + // For Struct types, field names are semantically meaningful (they + // identify different columns), so we require name equality here. + // This distinguishes from List/Map wrapper field names ("item" vs + // "element") which are purely cosmetic. + pfields.len() == lfields.len() + && pfields.iter().zip(lfields.iter()).all(|(pf, lf)| { + pf.name() == lf.name() + && pf.is_nullable() == lf.is_nullable() + && (pf.data_type() == lf.data_type() + || types_differ_only_in_field_names( + pf.data_type(), + lf.data_type(), + )) + }) + } + _ => false, + } +} + +/// Recursively relabel an array so its `DataType` matches `target_type`. +/// Only changes metadata (field names, nullability flags in nested fields). +/// Does NOT change the underlying buffer data. +fn relabel_array(array: ArrayRef, target_type: &DataType) -> ArrayRef { + if array.data_type() == target_type { + return array; + } + match target_type { + DataType::List(target_field) => { + let list = array.as_any().downcast_ref::().unwrap(); + let values = + relabel_array(Arc::clone(list.values()), target_field.data_type()); + Arc::new(ListArray::new( + Arc::clone(target_field), + list.offsets().clone(), + values, + list.nulls().cloned(), + )) + } + DataType::LargeList(target_field) => { + let list = array.as_any().downcast_ref::().unwrap(); + let values = + relabel_array(Arc::clone(list.values()), target_field.data_type()); + Arc::new(LargeListArray::new( + Arc::clone(target_field), + list.offsets().clone(), + values, + list.nulls().cloned(), + )) + } + DataType::Map(target_entries_field, sorted) => { + let map = array.as_any().downcast_ref::().unwrap(); + let entries = relabel_array( + Arc::new(map.entries().clone()), + target_entries_field.data_type(), + ); + let entries_struct = entries.as_any().downcast_ref::().unwrap(); + Arc::new(MapArray::new( + Arc::clone(target_entries_field), + map.offsets().clone(), + entries_struct.clone(), + map.nulls().cloned(), + *sorted, + )) + } + DataType::Struct(target_fields) => { + let struct_arr = array.as_any().downcast_ref::().unwrap(); + let columns: Vec = target_fields + .iter() + .zip(struct_arr.columns()) + .map(|(tf, col)| relabel_array(Arc::clone(col), tf.data_type())) + .collect(); + Arc::new(StructArray::new( + target_fields.clone(), + columns, + struct_arr.nulls().cloned(), + )) + } + // Primitive types - shallow swap is safe. + _ => { + let data = array.to_data(); + let new_data = data + .into_builder() + .data_type(target_type.clone()) + .build() + .expect("relabel_array: data layout must be compatible"); + make_array(new_data) + } + } +} + +/// Casts a `Timestamp(Microsecond)` array to `Timestamp(Millisecond)` by +/// dividing values by 1000. Preserves the timezone from the target type. +fn cast_timestamp_micros_to_millis_array( + array: &ArrayRef, + target_tz: Option>, +) -> ArrayRef { + let micros_array = array + .as_any() + .downcast_ref::() + .expect("Expected TimestampMicrosecondArray"); + + let millis_values: TimestampMillisecondArray = + arrow::compute::kernels::arity::unary(micros_array, |v| v / 1000); + + let result = if let Some(tz) = target_tz { + millis_values.with_timezone(tz) + } else { + millis_values + }; + + Arc::new(result) +} + +/// Casts a `Timestamp(Microsecond)` scalar to `Timestamp(Millisecond)` by +/// dividing the value by 1000. Preserves the timezone from the target type. +fn cast_timestamp_micros_to_millis_scalar( + opt_val: Option, + target_tz: Option>, +) -> ScalarValue { + let new_val = opt_val.map(|v| v / 1000); + ScalarValue::TimestampMillisecond(new_val, target_tz) +} + +/// A column-level cast that adapts a Parquet column to its requested type +/// using Spark semantics for nested types and a handful of primitive cases +/// that Arrow's cast does not handle correctly for Spark (e.g. timestamp +/// micros → millis preserving Spark's truncation semantics, or struct/list/map +/// adaptation that follows Spark's `clipParquetGroupFields`). +#[derive(Debug, Clone, Eq)] +pub struct SparkCastColumnExpr { + /// The physical expression producing the value to cast. + expr: Arc, + /// The physical field of the input column. + input_physical_field: FieldRef, + /// The field type required by the query. + target_field: FieldRef, + /// Options forwarded to Arrow cast. + cast_options: CastOptions<'static>, + /// Spark parquet options for complex nested type conversions. + /// When present, enables [`spark_parquet_convert`] as a fallback. + parquet_options: Option, +} + +// Manually derive `PartialEq`/`Hash` because `Arc` does not +// implement them by default for the trait object. +impl PartialEq for SparkCastColumnExpr { + fn eq(&self, other: &Self) -> bool { + self.expr.eq(&other.expr) + && self.input_physical_field.eq(&other.input_physical_field) + && self.target_field.eq(&other.target_field) + && self.cast_options.eq(&other.cast_options) + && self.parquet_options.eq(&other.parquet_options) + } +} + +impl Hash for SparkCastColumnExpr { + fn hash(&self, state: &mut H) { + self.expr.hash(state); + self.input_physical_field.hash(state); + self.target_field.hash(state); + self.cast_options.hash(state); + self.parquet_options.hash(state); + } +} + +impl SparkCastColumnExpr { + /// Create a new [`SparkCastColumnExpr`]. + pub fn new( + expr: Arc, + physical_field: FieldRef, + target_field: FieldRef, + cast_options: Option>, + ) -> Self { + Self { + expr, + input_physical_field: physical_field, + target_field, + cast_options: cast_options.unwrap_or(DEFAULT_CAST_OPTIONS), + parquet_options: None, + } + } + + /// Attach Spark parquet options to enable complex nested type conversions. + pub fn with_parquet_options(mut self, options: SparkParquetOptions) -> Self { + self.parquet_options = Some(options); + self + } +} + +impl Display for SparkCastColumnExpr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "SPARK_CAST_COLUMN({} AS {})", + self.expr, + self.target_field.data_type() + ) + } +} + +impl PhysicalExpr for SparkCastColumnExpr { + fn data_type(&self, _input_schema: &Schema) -> Result { + Ok(self.target_field.data_type().clone()) + } + + fn nullable(&self, _input_schema: &Schema) -> Result { + Ok(self.target_field.is_nullable()) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + let value = self.expr.evaluate(batch)?; + + // Use `==` (PartialEq) instead of `equals_datatype` because + // `equals_datatype` ignores field names in nested types (Struct, + // List, Map). We need to detect when field names differ (e.g., + // Struct("a","b") vs Struct("c","d")) so that we can apply + // `spark_parquet_convert` for field-name-based selection. + if value.data_type() == *self.target_field.data_type() { + return Ok(value); + } + + let input_physical_field = self.input_physical_field.data_type(); + let target_field = self.target_field.data_type(); + + match (input_physical_field, target_field) { + // Timestamp(Microsecond) -> Timestamp(Millisecond) + ( + DataType::Timestamp(TimeUnit::Microsecond, _), + DataType::Timestamp(TimeUnit::Millisecond, target_tz), + ) => match value { + ColumnarValue::Array(array) => { + let casted = + cast_timestamp_micros_to_millis_array(&array, target_tz.clone()); + Ok(ColumnarValue::Array(casted)) + } + ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(opt_val, _)) => { + let casted = cast_timestamp_micros_to_millis_scalar( + opt_val, + target_tz.clone(), + ); + Ok(ColumnarValue::Scalar(casted)) + } + _ => Ok(value), + }, + // Nested types that differ only in field names (e.g., List + // element named "item" vs "element", or Map entries named + // "key_value" vs "entries"). Re-label the array so the DataType + // metadata matches the logical schema. + (physical, logical) + if physical != logical + && types_differ_only_in_field_names(physical, logical) => + { + match value { + ColumnarValue::Array(array) => { + let relabeled = relabel_array(array, logical); + Ok(ColumnarValue::Array(relabeled)) + } + other => Ok(other), + } + } + // Fallback: use spark_parquet_convert for complex nested type + // conversions (e.g., List → List, + // map field selection, etc.). + _ => { + if let Some(parquet_options) = &self.parquet_options { + let converted = + spark_parquet_convert(value, target_field, parquet_options)?; + Ok(converted) + } else { + Ok(value) + } + } + } + } + + fn return_field(&self, _input_schema: &Schema) -> Result { + Ok(Arc::clone(&self.target_field)) + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.expr] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + assert_eq!(children.len(), 1); + let child = children.pop().expect("SparkCastColumnExpr child"); + let mut new_expr = Self::new( + child, + Arc::clone(&self.input_physical_field), + Arc::clone(&self.target_field), + Some(self.cast_options.clone()), + ); + if let Some(opts) = &self.parquet_options { + new_expr = new_expr.with_parquet_options(opts.clone()); + } + Ok(Arc::new(new_expr)) + } + + fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Display::fmt(self, f) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Int32Array, StringArray}; + use arrow::buffer::OffsetBuffer; + use arrow::datatypes::{Field, Fields}; + use datafusion_physical_expr::expressions::Column; + + #[test] + fn test_cast_timestamp_micros_to_millis_array() { + let micros_array: TimestampMicrosecondArray = vec![ + Some(1_000_000), + Some(2_500_000), + None, + Some(0), + Some(-1_000_000), + ] + .into(); + let array_ref: ArrayRef = Arc::new(micros_array); + + let result = cast_timestamp_micros_to_millis_array(&array_ref, None); + let millis_array = result + .as_any() + .downcast_ref::() + .expect("Expected TimestampMillisecondArray"); + + assert_eq!(millis_array.len(), 5); + assert_eq!(millis_array.value(0), 1000); + assert_eq!(millis_array.value(1), 2500); + assert!(millis_array.is_null(2)); + assert_eq!(millis_array.value(3), 0); + assert_eq!(millis_array.value(4), -1000); + } + + #[test] + fn test_cast_timestamp_micros_to_millis_scalar() { + let result = cast_timestamp_micros_to_millis_scalar(Some(1_500_000), None); + assert_eq!(result, ScalarValue::TimestampMillisecond(Some(1500), None)); + + let null_result = cast_timestamp_micros_to_millis_scalar(None, None); + assert_eq!(null_result, ScalarValue::TimestampMillisecond(None, None)); + + let target_tz: Option> = Some(Arc::from("UTC")); + let tz_result = + cast_timestamp_micros_to_millis_scalar(Some(2_000_000), target_tz.clone()); + assert_eq!( + tz_result, + ScalarValue::TimestampMillisecond(Some(2000), target_tz) + ); + } + + #[test] + fn test_relabel_list_field_name() { + // Physical: List(Field("item", Int32)) + // Logical: List(Field("element", Int32)) + let physical_field = Arc::new(Field::new("item", DataType::Int32, true)); + let logical_field = Arc::new(Field::new("element", DataType::Int32, true)); + + let values = Int32Array::from(vec![1, 2, 3]); + let list = ListArray::new( + physical_field, + OffsetBuffer::new(vec![0, 2, 3].into()), + Arc::new(values), + None, + ); + let array: ArrayRef = Arc::new(list); + + let target_type = DataType::List(Arc::clone(&logical_field)); + let result = relabel_array(array, &target_type); + assert_eq!(result.data_type(), &target_type); + } + + #[test] + fn test_relabel_map_entries_field_name() { + // Physical: Map(Field("key_value", Struct{key, value})) + // Logical: Map(Field("entries", Struct{key, value})) + let key_field = Arc::new(Field::new("key", DataType::Utf8, false)); + let value_field = Arc::new(Field::new("value", DataType::Int32, true)); + let struct_fields = + Fields::from(vec![Arc::clone(&key_field), Arc::clone(&value_field)]); + + let physical_entries_field = Arc::new(Field::new( + "key_value", + DataType::Struct(struct_fields.clone()), + false, + )); + let logical_entries_field = Arc::new(Field::new( + "entries", + DataType::Struct(struct_fields.clone()), + false, + )); + + let keys = StringArray::from(vec!["a", "b"]); + let values = Int32Array::from(vec![1, 2]); + let entries = + StructArray::new(struct_fields, vec![Arc::new(keys), Arc::new(values)], None); + let map = MapArray::new( + physical_entries_field, + OffsetBuffer::new(vec![0, 2].into()), + entries, + None, + false, + ); + let array: ArrayRef = Arc::new(map); + + let target_type = DataType::Map(logical_entries_field, false); + let result = relabel_array(array, &target_type); + assert_eq!(result.data_type(), &target_type); + } + + #[test] + fn test_evaluate_micros_to_millis_array() { + let input_field = Arc::new(Field::new( + "ts", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + )); + let schema = Schema::new(vec![Arc::clone(&input_field)]); + + let target_field = Arc::new(Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + )); + + let col_expr: Arc = Arc::new(Column::new("ts", 0)); + + let cast_expr = + SparkCastColumnExpr::new(col_expr, input_field, target_field, None); + + let micros_array: TimestampMicrosecondArray = + vec![Some(1_000_000), Some(2_000_000), None].into(); + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(micros_array)]).unwrap(); + + let result = cast_expr.evaluate(&batch).unwrap(); + + match result { + ColumnarValue::Array(arr) => { + let millis_array = arr + .as_any() + .downcast_ref::() + .expect("Expected TimestampMillisecondArray"); + assert_eq!(millis_array.value(0), 1000); + assert_eq!(millis_array.value(1), 2000); + assert!(millis_array.is_null(2)); + } + _ => panic!("Expected Array result"), + } + } +} diff --git a/datafusion/spark/src/parquet/error.rs b/datafusion/spark/src/parquet/error.rs new file mode 100644 index 0000000000000..8bade0f25b826 --- /dev/null +++ b/datafusion/spark/src/parquet/error.rs @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Errors raised by the Spark-compatible Parquet schema adapter. +//! +//! These mirror the small subset of [Spark's error taxonomy] that the +//! vectorized Parquet reader can raise. They are surfaced through +//! [`datafusion_common::DataFusionError::External`] so callers that bridge +//! to a Spark JVM (e.g. Comet) can pattern-match on the inner type and +//! convert to the matching Spark exception class. +//! +//! [Spark's error taxonomy]: https://spark.apache.org/docs/latest/sql-error-conditions.html + +use std::error::Error; +use std::fmt; + +use datafusion_common::DataFusionError; + +/// Errors raised by the Spark-compatible Parquet schema adapter. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ParquetSchemaError { + /// Mirrors Spark's `_LEGACY_ERROR_TEMP_2093`: when reading in + /// case-insensitive mode, more than one Parquet field matches the + /// requested name. + DuplicateFieldCaseInsensitive { + required_field_name: String, + matched_fields: String, + }, + + /// Mirrors Spark's `_LEGACY_ERROR_TEMP_2094`: multiple Parquet fields + /// share the same field id when the read schema requested an id-based + /// lookup. + DuplicateFieldByFieldId { + required_id: i32, + matched_fields: String, + }, + + /// Mirrors the runtime error raised in Spark's `ParquetReadSupport` + /// when the Spark read schema requests Parquet field-id matching but the + /// file carries no field ids and `spark.sql.parquet.fieldId.read.ignoreMissing` + /// is `false`. + MissingFieldIds, + + /// Schema mismatch when reading a Parquet column under a requested schema + /// that's incompatible with the physical column type. Mirrors Spark's + /// `SchemaColumnConvertNotSupportedException`. The `file_path` may be + /// empty when the rejection happens at planning time (the schema adapter + /// does not carry the file path). + SchemaConvert { + file_path: String, + column: String, + physical_type: String, + spark_type: String, + }, +} + +impl fmt::Display for ParquetSchemaError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::DuplicateFieldCaseInsensitive { + required_field_name, + matched_fields, + } => write!( + f, + "[_LEGACY_ERROR_TEMP_2093] Found duplicate field(s) \"{required_field_name}\": [{matched_fields}] in case-insensitive mode" + ), + Self::DuplicateFieldByFieldId { + required_id, + matched_fields, + } => write!( + f, + "[_LEGACY_ERROR_TEMP_2094] Found duplicate field(s) by id: id={required_id} matches [{matched_fields}] in id-lookup mode" + ), + Self::MissingFieldIds => write!( + f, + "Spark read schema expects field Ids, but Parquet file schema doesn't contain any field Ids. \ + Please remove the field ids from Spark schema or ignore missing ids by setting \ + `spark.sql.parquet.fieldId.read.ignoreMissing = true`" + ), + Self::SchemaConvert { + file_path, + column, + physical_type, + spark_type, + } => write!( + f, + "Parquet column cannot be converted in file {file_path}. \ + Column: [{column}], Expected: {spark_type}, Found: {physical_type}" + ), + } + } +} + +impl Error for ParquetSchemaError {} + +impl From for DataFusionError { + fn from(value: ParquetSchemaError) -> Self { + DataFusionError::External(Box::new(value)) + } +} diff --git a/datafusion/spark/src/parquet/mod.rs b/datafusion/spark/src/parquet/mod.rs new file mode 100644 index 0000000000000..35de1324c486c --- /dev/null +++ b/datafusion/spark/src/parquet/mod.rs @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Spark-compatible Parquet reading utilities. +//! +//! This module provides a [`PhysicalExprAdapterFactory`] implementation that, +//! when plugged into a `FileScanConfig`, makes DataFusion read Parquet files +//! with the same semantics as Apache Spark's vectorized reader. +//! +//! [`PhysicalExprAdapterFactory`]: datafusion_physical_expr_adapter::PhysicalExprAdapterFactory +//! +//! # Quick start +//! +//! ```no_run +//! use std::sync::Arc; +//! use datafusion_spark::parquet::{ +//! EvalMode, SparkParquetOptions, SparkPhysicalExprAdapterFactory, +//! }; +//! use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; +//! +//! // Configure Spark options. Most users want to start from a defaults +//! // constructor and override the per-Spark-version flags. +//! let mut options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); +//! // Spark 3.x rejects type promotion in the vectorized reader; flip this +//! // to `true` to emulate Spark 4.x. +//! options.allow_type_promotion = false; +//! +//! let factory: Arc = +//! Arc::new(SparkPhysicalExprAdapterFactory::new(options, None)); +//! +//! // Plug `factory` into a `FileScanConfigBuilder` via `with_expr_adapter`. +//! ``` +//! +//! # Per-Spark-version behavior +//! +//! Configure [`SparkParquetOptions`] flags to match the Spark version you are +//! emulating. Notable version-sensitive switches: +//! +//! * [`SparkParquetOptions::allow_type_promotion`] — Spark 3.x's vectorized +//! reader rejects INT32→INT64, FLOAT→DOUBLE, INT32→DOUBLE; Spark 4.x +//! allows them. +//! * [`SparkParquetOptions::return_null_struct_if_all_fields_missing`] — +//! pre-4.1 Spark returned the struct as `null` when no requested fields +//! matched; SPARK-53535 (Spark 4.1+) preserves the parent's nullness. +//! * [`SparkParquetOptions::eval_mode`] — Spark 4.0 made `Ansi` the default. + +mod cast_column; +mod error; +mod options; +mod parquet_support; +mod schema_adapter; + +pub use cast_column::SparkCastColumnExpr; +pub use error::ParquetSchemaError; +pub use options::{EvalMode, SparkParquetOptions}; +pub use parquet_support::spark_parquet_convert; +pub use schema_adapter::SparkPhysicalExprAdapterFactory; diff --git a/datafusion/spark/src/parquet/options.rs b/datafusion/spark/src/parquet/options.rs new file mode 100644 index 0000000000000..869d312d5d812 --- /dev/null +++ b/datafusion/spark/src/parquet/options.rs @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Options controlling Spark-compatible Parquet reading. +//! +//! See [`SparkParquetOptions`] for the full list of settings. + +/// Spark expression evaluation mode. +/// +/// Spark supports three evaluation modes when evaluating expressions, which affect +/// the behavior when processing input values that are invalid or would result in an +/// error, such as divide-by-zero errors, and also affect behavior when converting +/// between types. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum EvalMode { + /// Default behaviour in Spark prior to Spark 4.0. Silently ignores or replaces + /// errors during SQL operations and enables implicit type conversions. + Legacy, + /// Adheres to the ANSI SQL standard for error handling. Throws exceptions for + /// operations that would otherwise silently succeed and does not perform + /// implicit type conversions. + Ansi, + /// Same as [`EvalMode::Ansi`] except that errors become NULL values rather than + /// failing the entire query. + Try, +} + +/// Options controlling Spark-compatible Parquet type conversion. +/// +/// Many of these mirror the equivalent Spark configuration entries: +/// +/// * [`use_field_id`](Self::use_field_id) / +/// [`ignore_missing_field_id`](Self::ignore_missing_field_id) mirror +/// `spark.sql.parquet.fieldId.read.enabled` and +/// `spark.sql.parquet.fieldId.read.ignoreMissing`. +/// * [`return_null_struct_if_all_fields_missing`](Self::return_null_struct_if_all_fields_missing) +/// mirrors `spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing` +/// (SPARK-53535, Spark 4.1+). +/// * [`allow_type_promotion`](Self::allow_type_promotion) tracks Spark's +/// per-version vectorized-reader policy for widening reads (e.g. INT32 → INT64 +/// was rejected in Spark 3.x but is allowed in Spark 4.x). +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct SparkParquetOptions { + /// Spark evaluation mode (forwarded to nested-type conversion). + pub eval_mode: EvalMode, + /// Session timezone, used when casting to/from timezone-related types. + // TODO: replace with `Tz` to avoid repeated parsing. + pub timezone: String, + /// Allow casts that are supported but not guaranteed to be 100% compatible + /// with Spark. + pub allow_incompat: bool, + /// Allow casting unsigned integers to signed integers (used by the + /// Parquet schema adapter for Iceberg / Arrow files that use unsigned types). + pub allow_cast_unsigned_ints: bool, + /// Whether to always represent decimals using 128 bits. If `false`, the + /// native reader may represent decimals using 32 or 64 bits depending on + /// the precision. + pub use_decimal_128: bool, + /// Whether to read dates/timestamps written in the legacy hybrid Julian + + /// Gregorian calendar without rebasing. If `false`, throw exceptions + /// instead. If the Spark type is `TimestampNTZ`, this should be `true`. + pub use_legacy_date_timestamp_or_ntz: bool, + /// Whether schema field names are matched case-sensitively. + pub case_sensitive: bool, + /// SPARK-53535 (Spark 4.1+): when reading a struct whose requested fields + /// are all missing in the Parquet file, `true` returns the entire struct + /// as null (pre-4.1 legacy behavior); `false` preserves the parent struct's + /// nullness from the file so non-null parents return a struct of all-null + /// fields. + pub return_null_struct_if_all_fields_missing: bool, + /// When `true`, resolve fields by `parquet.field.id` metadata instead of by + /// name (mirrors `spark.sql.parquet.fieldId.read.enabled`). Only takes + /// effect when both physical and logical fields actually carry IDs. + pub use_field_id: bool, + /// When `false` (Spark's default), reading a file that has no field ids + /// while the requested schema does carry ids raises a runtime error rather + /// than silently producing nulls (mirrors + /// `spark.sql.parquet.fieldId.read.ignoreMissing`). + pub ignore_missing_field_id: bool, + /// Whether type promotion (schema evolution) is allowed, e.g. INT32 -> + /// INT64, FLOAT -> DOUBLE. Spark 3.x rejects these in the vectorized + /// reader; Spark 4.x allows them. + pub allow_type_promotion: bool, +} + +impl SparkParquetOptions { + /// Create a new [`SparkParquetOptions`] with the given evaluation mode, + /// timezone, and `allow_incompat` flag. All other fields default to values + /// matching Spark 3.x's vectorized-reader behavior. + pub fn new(eval_mode: EvalMode, timezone: &str, allow_incompat: bool) -> Self { + Self { + eval_mode, + timezone: timezone.to_string(), + allow_incompat, + allow_cast_unsigned_ints: false, + use_decimal_128: false, + use_legacy_date_timestamp_or_ntz: false, + case_sensitive: false, + return_null_struct_if_all_fields_missing: true, + use_field_id: false, + ignore_missing_field_id: false, + allow_type_promotion: false, + } + } + + /// Create a new [`SparkParquetOptions`] without a timezone. Useful for + /// conversions that do not involve timezone-aware types. + pub fn new_without_timezone(eval_mode: EvalMode, allow_incompat: bool) -> Self { + Self::new(eval_mode, "", allow_incompat) + } +} diff --git a/datafusion/spark/src/parquet/parquet_support.rs b/datafusion/spark/src/parquet/parquet_support.rs new file mode 100644 index 0000000000000..7b81f2de4a773 --- /dev/null +++ b/datafusion/spark/src/parquet/parquet_support.rs @@ -0,0 +1,359 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Spark-compatible Parquet conversion routines. +//! +//! [`spark_parquet_convert`] mirrors Spark's vectorized Parquet reader's +//! per-column type conversion. It is invoked by [`crate::parquet::cast_column::SparkCastColumnExpr`] +//! to handle nested types (Struct, List, Map) and the few primitive +//! conversions that Arrow's `cast` does not get right for Spark semantics +//! (e.g. INT96 timezone relabeling, FixedSizeBinary(16) UUID rendering). + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayRef, DictionaryArray, FixedSizeBinaryArray, ListArray, MapArray, + StringArray, StructArray, cast::AsArray, new_null_array, types::Int32Type, + types::TimestampMicrosecondType, +}; +use arrow::buffer::NullBuffer; +use arrow::compute::{CastOptions, can_cast_types, cast_with_options, take}; +use arrow::datatypes::{DataType, Field, FieldRef, Fields, TimeUnit}; +use arrow::util::display::FormatOptions; +use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_expr_common::columnar_value::ColumnarValue; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use super::options::SparkParquetOptions; + +const TIMESTAMP_FORMAT: Option<&str> = Some("%Y-%m-%d %H:%M:%S%.f"); + +const PARQUET_CAST_OPTIONS: CastOptions = CastOptions { + safe: true, + format_options: FormatOptions::new() + .with_timestamp_tz_format(TIMESTAMP_FORMAT) + .with_timestamp_format(TIMESTAMP_FORMAT), +}; + +/// Spark-compatible Parquet conversion. Defers to Arrow's `cast` where that is +/// known to be compatible with Spark, and applies custom logic for the cases +/// where Spark's vectorized Parquet reader differs (struct field selection, +/// list/map adaptation, INT96 timezone handling, FixedSizeBinary(16) UUIDs). +pub fn spark_parquet_convert( + arg: ColumnarValue, + data_type: &DataType, + parquet_options: &SparkParquetOptions, +) -> Result { + match arg { + ColumnarValue::Array(array) => Ok(ColumnarValue::Array(parquet_convert_array( + array, + data_type, + parquet_options, + )?)), + ColumnarValue::Scalar(scalar) => { + // Normally a CAST(scalar) is folded on the Spark JVM side. However, + // for some cases (e.g. scalar subqueries) Spark does not fold the + // cast, so we have to handle it here. + let array = scalar.to_array()?; + let scalar = ScalarValue::try_from_array( + &parquet_convert_array(array, data_type, parquet_options)?, + 0, + )?; + Ok(ColumnarValue::Scalar(scalar)) + } + } +} + +fn parquet_convert_array( + array: ArrayRef, + to_type: &DataType, + parquet_options: &SparkParquetOptions, +) -> Result { + use DataType::*; + let from_type = array.data_type().clone(); + + // Strip dictionary encoding before converting; the result is rebuilt as a + // dictionary if the target type is itself dictionary-encoded. + let array = match &from_type { + Dictionary(key_type, value_type) + if key_type.as_ref() == &Int32 + && (value_type.as_ref() == &Utf8 + || value_type.as_ref() == &LargeUtf8) => + { + let dict_array = array + .as_any() + .downcast_ref::>() + .expect("Expected a dictionary array"); + + let casted_dictionary = DictionaryArray::::new( + dict_array.keys().clone(), + parquet_convert_array( + Arc::clone(dict_array.values()), + to_type, + parquet_options, + )?, + ); + + let casted_result = match to_type { + Dictionary(_, _) => Arc::new(casted_dictionary.clone()), + _ => take(casted_dictionary.values().as_ref(), dict_array.keys(), None)?, + }; + return Ok(casted_result); + } + _ => array, + }; + let from_type = array.data_type(); + + match (from_type, to_type) { + (Struct(_), Struct(_)) => parquet_convert_struct_to_struct( + array.as_struct(), + from_type, + to_type, + parquet_options, + ), + (List(_), List(to_inner_type)) => { + let list_arr: &ListArray = array.as_list(); + let cast_field = parquet_convert_array( + Arc::clone(list_arr.values()), + to_inner_type.data_type(), + parquet_options, + )?; + + Ok(Arc::new(ListArray::new( + Arc::clone(to_inner_type), + list_arr.offsets().clone(), + cast_field, + list_arr.nulls().cloned(), + ))) + } + // INT96 columns surface as `Timestamp(Microsecond, None)` after the + // first arrow-rs read pass, but Spark stores them as + // `Timestamp(Microsecond, Some("UTC"))`. The values are identical; + // only the timezone metadata is different, so reinterpret rather + // than convert. + ( + Timestamp(TimeUnit::Microsecond, None), + Timestamp(TimeUnit::Microsecond, Some(tz)), + ) => Ok(Arc::new( + array + .as_primitive::() + .reinterpret_cast::() + .with_timezone(Arc::clone(tz)), + )), + (Map(_, ordered_from), Map(_, ordered_to)) if ordered_from == ordered_to => { + parquet_convert_map_to_map( + array.as_map(), + to_type, + parquet_options, + *ordered_to, + ) + } + // Iceberg stores UUIDs as 16-byte fixed binary but Spark surfaces them + // as their string representation. Arrow does not support casting + // FixedSizeBinary to Utf8, so we render the UUIDs ourselves. + (FixedSizeBinary(16), Utf8) => { + let binary_array = array + .as_any() + .downcast_ref::() + .expect("Expected a FixedSizeBinaryArray"); + + let string_array: StringArray = binary_array + .iter() + .map(|opt_bytes| { + opt_bytes.map(|bytes| { + let uuid = uuid::Uuid::from_bytes( + bytes.try_into().expect("Expected 16 bytes"), + ); + uuid.to_string() + }) + }) + .collect(); + + Ok(Arc::new(string_array)) + } + // If Arrow cast supports the cast, delegate to it. + _ if can_cast_types(from_type, to_type) => { + Ok(cast_with_options(&array, to_type, &PARQUET_CAST_OPTIONS)?) + } + _ => Ok(array), + } +} + +/// Read the Parquet field id from arrow-rs's `PARQUET_FIELD_ID_META_KEY`. +fn field_id(field: &Field) -> Option { + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|v| v.parse::().ok()) +} + +/// Cast between struct types based on logic in +/// `org.apache.spark.sql.catalyst.expressions.Cast#castStruct`. +fn parquet_convert_struct_to_struct( + array: &StructArray, + from_type: &DataType, + to_type: &DataType, + parquet_options: &SparkParquetOptions, +) -> Result { + let (DataType::Struct(from_fields), DataType::Struct(to_fields)) = + (from_type, to_type) + else { + return Err(DataFusionError::Internal(format!( + "parquet_convert_struct_to_struct expected struct types, got {from_type} -> {to_type}" + ))); + }; + + // Match `from` (file) fields to `to` (logical) fields. Mirrors Spark's + // `clipParquetGroupFields`: when the logical struct carries Parquet field IDs + // anywhere, ID-bearing logical fields match ONLY by ID; non-ID-bearing fields + // fall back to name match. When no logical field carries an ID, fall back to + // name match across the board. + let should_match_by_id = + parquet_options.use_field_id && to_fields.iter().any(|f| field_id(f).is_some()); + + let from_id_to_index: HashMap = if should_match_by_id { + let mut map = HashMap::new(); + for (i, field) in from_fields.iter().enumerate() { + if let Some(id) = field_id(field) { + map.entry(id).or_insert(i); + } + } + map + } else { + HashMap::new() + }; + + let normalize_name = |name: &str| -> String { + if parquet_options.case_sensitive { + name.to_string() + } else { + name.to_lowercase() + } + }; + let mut field_name_to_index_map = HashMap::new(); + for (i, field) in from_fields.iter().enumerate() { + field_name_to_index_map.insert(normalize_name(field.name()), i); + } + + let mut field_overlap = false; + let mut cast_fields: Vec = Vec::with_capacity(to_fields.len()); + for to_field in to_fields.iter() { + let from_index = match (should_match_by_id, field_id(to_field)) { + // Spark treats a missing ID match as a missing column rather than + // falling back to name match. + (true, Some(id)) => from_id_to_index.get(&id).copied(), + _ => field_name_to_index_map + .get(&normalize_name(to_field.name())) + .copied(), + }; + + if let Some(from_index) = from_index { + cast_fields.push(parquet_convert_array( + Arc::clone(array.column(from_index)), + to_field.data_type(), + parquet_options, + )?); + field_overlap = true; + } else { + cast_fields.push(new_null_array(to_field.data_type(), array.len())); + } + } + + // When the file's struct contains none of the requested fields, the + // returned validity buffer depends on Spark's + // `spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing` (SPARK-53535, + // Spark 4.1+). Legacy mode marks the whole column null; the new default + // preserves the file's parent-row nullness so non-null parents materialize + // as a struct of all-null fields. + let nulls = + if !field_overlap && parquet_options.return_null_struct_if_all_fields_missing { + Some(NullBuffer::new_null(array.len())) + } else { + array.nulls().cloned() + }; + + Ok(Arc::new(StructArray::new( + to_fields.clone(), + cast_fields, + nulls, + ))) +} + +/// Cast a map type to another map type. Recursively calls +/// [`parquet_convert_array`] for the keys and values rather than relying on +/// Arrow's cast, so nested type adaptations propagate. +fn parquet_convert_map_to_map( + from: &MapArray, + to_data_type: &DataType, + parquet_options: &SparkParquetOptions, + to_ordered: bool, +) -> Result { + let DataType::Map(entries_field, _) = to_data_type else { + return Err(DataFusionError::Internal(format!( + "Expected MapType. Got: {to_data_type}" + ))); + }; + + let key_field = key_field(entries_field).ok_or_else(|| { + DataFusionError::Internal("map is missing key field".to_string()) + })?; + let value_field = value_field(entries_field).ok_or_else(|| { + DataFusionError::Internal("map is missing value field".to_string()) + })?; + + let key_array = parquet_convert_array( + Arc::clone(from.keys()), + key_field.data_type(), + parquet_options, + )?; + let value_array = parquet_convert_array( + Arc::clone(from.values()), + value_field.data_type(), + parquet_options, + )?; + + Ok(Arc::new(MapArray::new( + Arc::clone(entries_field), + from.offsets().clone(), + StructArray::new( + Fields::from(vec![key_field, value_field]), + vec![key_array, value_array], + from.entries().nulls().cloned(), + ), + from.nulls().cloned(), + to_ordered, + ))) +} + +/// Returns the key field from a map's entries struct. +fn key_field(entries_field: &FieldRef) -> Option { + if let DataType::Struct(fields) = entries_field.data_type() { + fields.first().cloned() + } else { + None + } +} + +/// Returns the value field from a map's entries struct. +fn value_field(entries_field: &FieldRef) -> Option { + if let DataType::Struct(fields) = entries_field.data_type() { + fields.get(1).cloned() + } else { + None + } +} diff --git a/datafusion/spark/src/parquet/schema_adapter.rs b/datafusion/spark/src/parquet/schema_adapter.rs new file mode 100644 index 0000000000000..43fbc8dae85bf --- /dev/null +++ b/datafusion/spark/src/parquet/schema_adapter.rs @@ -0,0 +1,1728 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Spark-compatible Parquet schema adapter. +//! +//! [`SparkPhysicalExprAdapterFactory`] implements +//! [`PhysicalExprAdapterFactory`] so it can be plugged into a +//! [`FileScanConfig`] via `with_expr_adapter`. The resulting adapter rewrites +//! physical expressions at planning time so that column references against +//! the logical (query) schema resolve correctly to the physical (file) +//! schema, while preserving Spark's vectorized-reader semantics: +//! +//! * case-insensitive name matching with duplicate detection (Spark's +//! `_LEGACY_ERROR_TEMP_2093`), +//! * Parquet field-id matching with duplicate detection +//! (`_LEGACY_ERROR_TEMP_2094`) and the missing-field-id runtime error, +//! * type-promotion rejection rules that mirror +//! `ParquetVectorUpdaterFactory.getUpdater` in Spark, deferred to runtime +//! when necessary so empty Parquet files (SPARK-26709) still pass, +//! * default values for columns that are missing from the file schema, and +//! * Spark-compatible casts via [`SparkCastColumnExpr`] for nested types. +//! +//! [`FileScanConfig`]: https://docs.rs/datafusion-datasource/latest/datafusion_datasource/file_scan_config/struct.FileScanConfig.html + +use std::collections::{HashMap, HashSet}; +use std::fmt::{self, Display}; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use arrow::array::new_empty_array; +use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_expr_common::columnar_value::ColumnarValue; +use datafusion_physical_expr::expressions::{CastExpr, Column}; +use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, + replace_columns_with_literals, +}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use super::cast_column::SparkCastColumnExpr; +use super::error::ParquetSchemaError; +use super::options::SparkParquetOptions; + +/// Factory that creates Spark-compatible [`PhysicalExprAdapter`] instances. +/// +/// Plug this into a `FileScanConfig` via `with_expr_adapter` to read Parquet +/// files with Spark's vectorized-reader semantics. +#[derive(Clone, Debug)] +pub struct SparkPhysicalExprAdapterFactory { + /// Spark-specific Parquet options for type conversions. + parquet_options: SparkParquetOptions, + /// Default values for columns that may be missing from the physical + /// schema. The key is a `Column` (containing name and index). + default_values: Option>, +} + +impl SparkPhysicalExprAdapterFactory { + /// Create a new factory with the given options and optional default + /// values for missing columns. + pub fn new( + parquet_options: SparkParquetOptions, + default_values: Option>, + ) -> Self { + Self { + parquet_options, + default_values, + } + } +} + +/// Read the Parquet field id stored under arrow-rs's +/// `PARQUET_FIELD_ID_META_KEY`. +fn parse_field_id(field: &Field) -> Option { + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|v| v.parse::().ok()) +} + +fn schema_has_field_ids(schema: &SchemaRef) -> bool { + schema.fields().iter().any(|f| parse_field_id(f).is_some()) +} + +/// Remap physical schema field names to match logical schema field names. +/// Mirrors Spark's `clipParquetGroupFields`: prefer ID match for any logical +/// field that carries a `PARQUET:field_id`; fall back to case-insensitive +/// name match otherwise. +/// +/// The remap only changes top-level field NAMES so that +/// [`DefaultPhysicalExprAdapter`]'s exact-name lookup hits. Indices, types, +/// nullability, and metadata stay as in the file. Returns the rewritten +/// schema and a `logical_name -> original_physical_name` map used downstream +/// to restore the original physical names before stream consumption. +/// +/// [`DefaultPhysicalExprAdapter`]: datafusion_physical_expr_adapter::DefaultPhysicalExprAdapter +fn remap_physical_schema( + logical_schema: &SchemaRef, + physical_schema: &SchemaRef, + case_sensitive: bool, + use_field_id: bool, + ignore_missing_field_id: bool, +) -> Result<(SchemaRef, HashMap)> { + let should_match_by_id = use_field_id && schema_has_field_ids(logical_schema); + + if should_match_by_id + && !ignore_missing_field_id + && !schema_has_field_ids(physical_schema) + { + // Mirrors `ParquetReadSupport.inferSchema`'s eager check (Spark throws + // a runtime error rather than silently returning null columns). + return Err(ParquetSchemaError::MissingFieldIds.into()); + } + + // Build id -> all matching physical field names. We need the full list so + // we can mirror Spark's `_LEGACY_ERROR_TEMP_2094` "Found duplicate field(s)" + // error when an ID-bearing logical field would resolve to more than one + // physical field. + let mut id_to_phys_names: HashMap> = HashMap::new(); + if should_match_by_id { + for pf in physical_schema.fields() { + if let Some(id) = parse_field_id(pf) { + id_to_phys_names + .entry(id) + .or_default() + .push(pf.name().clone()); + } + } + for lf in logical_schema.fields() { + if let Some(id) = parse_field_id(lf) + && let Some(matches) = id_to_phys_names.get(&id) + && matches.len() > 1 + { + return Err(ParquetSchemaError::DuplicateFieldByFieldId { + required_id: id, + matched_fields: matches.join(", "), + } + .into()); + } + } + } + + // Pre-build id -> first matching logical field for the per-physical + // rename pass below. + let id_to_logical: HashMap = if should_match_by_id { + let mut map = HashMap::new(); + for lf in logical_schema.fields() { + if let Some(id) = parse_field_id(lf) { + map.entry(id).or_insert(lf); + } + } + map + } else { + HashMap::new() + }; + + // Names of ID-bearing logical fields whose ID is not present in the file. + // Any physical field that shares one of these names must be renamed to + // something the `DefaultPhysicalExprAdapter` cannot name-match, otherwise + // the read would silently fall through to a name match. Spark's + // `matchIdField` solves the same problem with `generateFakeColumnName` + // (see `ParquetReadSupport.scala`). + let unmatched_id_logical_names: HashSet = if should_match_by_id { + logical_schema + .fields() + .iter() + .filter_map(|lf| { + parse_field_id(lf).and_then(|id| { + if id_to_phys_names.contains_key(&id) { + None + } else { + Some(lf.name().clone()) + } + }) + }) + .collect() + } else { + HashSet::new() + }; + let mut fake_counter: usize = 0; + + let mut name_map: HashMap = HashMap::new(); + let remapped_fields: Vec = physical_schema + .fields() + .iter() + .map(|field| { + // ID match first when the logical schema is ID-bearing. + if should_match_by_id + && let Some(phys_id) = parse_field_id(field) + && let Some(logical_field) = id_to_logical.get(&phys_id) + { + if logical_field.name() != field.name() { + name_map.insert(logical_field.name().clone(), field.name().clone()); + return Arc::new( + Field::new( + logical_field.name(), + field.data_type().clone(), + field.is_nullable(), + ) + .with_metadata(field.metadata().clone()), + ); + } + return Arc::clone(field); + } + + // Block accidental name match for ID-bearing logical fields whose + // ID is missing from the file. Mirrors Spark's + // `generateFakeColumnName` in `matchIdField`. + if should_match_by_id + && unmatched_id_logical_names + .iter() + .any(|name| name.eq_ignore_ascii_case(field.name())) + { + fake_counter += 1; + let fake_name = format!("__datafusion_unmatched_field_id_{fake_counter}"); + return Arc::new( + Field::new(fake_name, field.data_type().clone(), field.is_nullable()) + .with_metadata(field.metadata().clone()), + ); + } + + // Name match. Spark's `matchIdField` does not fall through to a + // name match for ID-bearing logical fields, so skip those when + // the schema is ID-bearing. + if !case_sensitive { + let logical_field = logical_schema.fields().iter().find(|lf| { + let lf_has_id = should_match_by_id && parse_field_id(lf).is_some(); + !lf_has_id && lf.name().eq_ignore_ascii_case(field.name()) + }); + if let Some(logical_field) = logical_field + && logical_field.name() != field.name() + { + name_map.insert(logical_field.name().clone(), field.name().clone()); + return Arc::new( + Field::new( + logical_field.name(), + field.data_type().clone(), + field.is_nullable(), + ) + .with_metadata(field.metadata().clone()), + ); + } + } + + Arc::clone(field) + }) + .collect(); + + Ok((Arc::new(Schema::new(remapped_fields)), name_map)) +} + +/// Format an Arrow `DataType` as Spark's catalog string (e.g. `Int64` -> +/// `bigint`), so `SchemaColumnConvertNotSupportedException` messages match +/// Spark's vectorized reader. +fn spark_catalog_name(dt: &DataType) -> String { + match dt { + DataType::Boolean => "boolean".to_string(), + DataType::Int8 => "tinyint".to_string(), + DataType::Int16 => "smallint".to_string(), + DataType::Int32 => "int".to_string(), + DataType::Int64 => "bigint".to_string(), + DataType::Float32 => "float".to_string(), + DataType::Float64 => "double".to_string(), + DataType::Utf8 | DataType::LargeUtf8 => "string".to_string(), + DataType::Binary | DataType::LargeBinary => "binary".to_string(), + DataType::Date32 => "date".to_string(), + DataType::Timestamp(_, Some(_)) => "timestamp".to_string(), + DataType::Timestamp(_, None) => "timestamp_ntz".to_string(), + DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => { + format!("decimal({p},{s})") + } + _ => "unknown".to_string(), + } +} + +/// Format an Arrow `DataType` as the Parquet primitive type name (e.g. +/// `Int64` -> `INT64`, matching `PrimitiveTypeName.toString()` in parquet-mr). +fn parquet_primitive_name(dt: &DataType) -> &'static str { + match dt { + DataType::Boolean => "BOOLEAN", + DataType::Int8 | DataType::Int16 | DataType::Int32 => "INT32", + DataType::Int64 => "INT64", + DataType::Float32 => "FLOAT", + DataType::Float64 => "DOUBLE", + DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Binary + | DataType::LargeBinary => "BINARY", + // Spark stores DATE as INT32 with a DATE logical-type annotation. + DataType::Date32 => "INT32", + // Spark stores TIMESTAMP as INT64 with a timestamp annotation, or as + // INT96 (legacy nanos). arrow-rs surfaces both as `Timestamp`; + // without the original physical name we report INT64, which matches + // the common case. + DataType::Timestamp(_, _) => "INT64", + // Mirror Spark's `SparkToParquetSchemaConverter` decimal mapping: + // precision 1-9 -> INT32, 10-18 -> INT64, 19+ -> FIXED_LEN_BYTE_ARRAY. + DataType::Decimal128(p, _) | DataType::Decimal256(p, _) => { + if *p <= 9 { + "INT32" + } else if *p <= 18 { + "INT64" + } else { + "FIXED_LEN_BYTE_ARRAY" + } + } + _ => "UNKNOWN", + } +} + +fn is_string_or_binary(dt: &DataType) -> bool { + matches!( + dt, + DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary + ) +} + +/// Build a Spark-shaped `SchemaColumnConvertNotSupportedException` carrier +/// for a rejected Parquet -> Spark conversion. The bracketed column wrapping +/// mirrors `Arrays.toString(descriptor.getPath())` in Spark's vectorized +/// reader. +fn parquet_schema_convert_err( + field_name: &str, + physical_type: &DataType, + target_type: &DataType, +) -> DataFusionError { + ParquetSchemaError::SchemaConvert { + file_path: String::new(), + column: format!("[{field_name}]"), + physical_type: parquet_primitive_name(physical_type).to_string(), + spark_type: spark_catalog_name(target_type), + } + .into() +} + +/// Build a [`RejectOnNonEmpty`] expr wrapping `child`. The rejection fires +/// only when the input batch is non-empty (mirrors Spark's per-row-group +/// check). +fn reject_on_non_empty_expr( + child: Arc, + target_field: &FieldRef, + field_name: &str, + physical_type: &DataType, + target_type: &DataType, +) -> Arc { + Arc::new(RejectOnNonEmpty { + child, + target_field: Arc::clone(target_field), + column: format!("[{field_name}]"), + physical_type: parquet_primitive_name(physical_type).to_string(), + spark_type: spark_catalog_name(target_type), + }) +} + +/// Check if a specific column name has duplicate matches in the physical +/// schema (case-insensitive). Returns the error info if so. +fn check_column_duplicate( + col_name: &str, + physical_schema: &SchemaRef, +) -> Option<(String, String)> { + let matches: Vec<&str> = physical_schema + .fields() + .iter() + .filter(|pf| pf.name().eq_ignore_ascii_case(col_name)) + .map(|pf| pf.name().as_str()) + .collect(); + if matches.len() > 1 { + // Include brackets to match the format expected by Spark's shim. + Some((col_name.to_string(), format!("[{}]", matches.join(", ")))) + } else { + None + } +} + +impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { + fn create( + &self, + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + ) -> Result> { + // Remap physical schema field names to match logical names by Parquet + // field id (when the logical schema carries IDs and `use_field_id` is + // set) and/or by case-insensitive name match. The + // `DefaultPhysicalExprAdapter` uses exact name matching, so without + // this remapping, columns whose file names differ from the logical + // names won't match and will be filled with NULLs. + // + // We also keep a reverse map (logical name -> original physical name) + // so that after the default adapter produces expressions, we can + // remap column names back to the original physical names. This is + // necessary because downstream code looks up columns by name in the + // actual stream schema, which uses the original physical file column + // names. + let needs_remap = !self.parquet_options.case_sensitive + || (self.parquet_options.use_field_id + && schema_has_field_ids(&logical_file_schema)); + let ( + adapted_physical_schema, + logical_to_physical_names, + original_physical_schema, + ) = if needs_remap { + let (remapped, logical_to_physical) = remap_physical_schema( + &logical_file_schema, + &physical_file_schema, + self.parquet_options.case_sensitive, + self.parquet_options.use_field_id, + self.parquet_options.ignore_missing_field_id, + )?; + ( + remapped, + if logical_to_physical.is_empty() { + None + } else { + Some(logical_to_physical) + }, + // Keep the original physical schema for per-column + // duplicate detection. Only meaningful in + // case-insensitive mode (matches existing behavior). + if !self.parquet_options.case_sensitive { + Some(Arc::clone(&physical_file_schema)) + } else { + None + }, + ) + } else { + (Arc::clone(&physical_file_schema), None, None) + }; + + let default_factory = DefaultPhysicalExprAdapterFactory; + let default_adapter = default_factory.create( + Arc::clone(&logical_file_schema), + Arc::clone(&adapted_physical_schema), + )?; + + Ok(Arc::new(SparkPhysicalExprAdapter { + logical_file_schema, + physical_file_schema: adapted_physical_schema, + parquet_options: self.parquet_options.clone(), + default_values: self.default_values.clone(), + default_adapter, + logical_to_physical_names, + original_physical_schema, + })) + } +} + +/// Spark-compatible physical expression adapter. +/// +/// Created by [`SparkPhysicalExprAdapterFactory::create`]. Rewrites +/// expressions at planning time to: +/// +/// 1. replace references to missing columns with default values (or NULLs), +/// 2. apply Spark-compatible type-promotion rejection rules, +/// 3. wrap nested-type casts with [`SparkCastColumnExpr`] for Spark-compatible +/// conversion, and +/// 4. handle case-insensitive / field-id column matching. +#[derive(Debug)] +struct SparkPhysicalExprAdapter { + /// The logical schema expected by the query. + logical_file_schema: SchemaRef, + /// The physical schema of the actual file being read (after remapping for + /// field-id and case-insensitive matching). + physical_file_schema: SchemaRef, + /// Spark-specific options for type conversions. + parquet_options: SparkParquetOptions, + /// Default values for missing columns (keyed by `Column`). + default_values: Option>, + /// The default DataFusion adapter to delegate standard handling to. + default_adapter: Arc, + /// Mapping from logical column names to original physical column names, + /// used in case-insensitive mode where names differ in casing. After the + /// default adapter rewrites expressions using the remapped physical + /// schema (with logical names), we need to restore the original physical + /// names so that downstream code can find columns in the actual stream + /// schema. + logical_to_physical_names: Option>, + /// The original (un-remapped) physical schema, kept for per-column + /// duplicate detection in case-insensitive mode. Only set when + /// `!case_sensitive`. + original_physical_schema: Option, +} + +impl PhysicalExprAdapter for SparkPhysicalExprAdapter { + fn rewrite(&self, expr: Arc) -> Result> { + // In case-insensitive mode, check if any Column in this expression + // references a field with multiple case-insensitive matches in the + // physical schema. Only the columns actually referenced trigger the + // error (not the whole schema). + if let Some(orig_physical) = &self.original_physical_schema { + let mut duplicate_err: Option = None; + let _ = Arc::::clone(&expr).transform(|e| { + if let Some(col) = e.downcast_ref::() + && let Some((req, matched)) = + check_column_duplicate(col.name(), orig_physical) + { + duplicate_err = Some( + ParquetSchemaError::DuplicateFieldCaseInsensitive { + required_field_name: req, + matched_fields: matched, + } + .into(), + ); + } + Ok(Transformed::no(e)) + }); + if let Some(err) = duplicate_err { + return Err(err); + } + } + + // First let the default adapter handle column remapping, missing + // columns, and simple scalar type casts. Then replace DataFusion's + // CastExpr (when it wraps a Column reference, i.e. came from the + // default adapter's schema-mismatch handling) with either a + // SparkCastColumnExpr (for nested types) or kept as-is for primitives, + // applying the rejection rules first. + // + // The default adapter may fail for complex nested type casts (List, + // Map). In that case, fall back to wrapping everything ourselves. + let expr = self.replace_missing_with_defaults(expr)?; + let expr = match self.default_adapter.rewrite(Arc::clone(&expr)) { + Ok(rewritten) => rewritten + .transform(|e| self.handle_schema_mismatch_cast(e)) + .data()?, + Err(e) => { + // Default adapter failed (likely complex nested type cast). + // Handle all type mismatches ourselves using + // `spark_parquet_convert`. + log::debug!("Default schema adapter error: {e}"); + self.wrap_all_type_mismatches(expr)? + } + }; + + // For case-insensitive mode: remap column names from logical back to + // original physical names. The default adapter was given a remapped + // physical schema (with logical names) so it could find columns. But + // downstream code looks up columns by name in the actual parquet + // stream schema, which uses the original physical names. + let expr = if let Some(name_map) = &self.logical_to_physical_names { + expr.transform(|e| { + if let Some(col) = e.downcast_ref::() + && let Some(physical_name) = name_map.get(col.name()) + { + return Ok(Transformed::yes(Arc::new(Column::new( + physical_name, + col.index(), + )))); + } + Ok(Transformed::no(e)) + }) + .data()? + } else { + expr + }; + + Ok(expr) + } +} + +impl SparkPhysicalExprAdapter { + /// Wrap ALL Column expressions that have type mismatches with + /// [`SparkCastColumnExpr`]. This is the fallback path when the default + /// adapter fails (e.g., for complex nested type casts like List + /// or Map). Uses [`spark_parquet_convert`] under the hood for the actual + /// type conversion. + /// + /// [`spark_parquet_convert`]: super::parquet_support::spark_parquet_convert + fn wrap_all_type_mismatches( + &self, + expr: Arc, + ) -> Result> { + expr.transform(|e| { + if let Some(column) = e.downcast_ref::() { + let col_name = column.name(); + + // Resolve fields by name because this is the fallback path + // that runs on the original expression when the default + // adapter fails. The original expression was built against + // the required (pruned) schema, so column indices refer to + // that schema — not the logical or physical file schemas. + // DataFusion's `DefaultPhysicalExprAdapter::resolve_physical_column` + // also resolves by name for the same reason. + let logical_field = if self.parquet_options.case_sensitive { + self.logical_file_schema + .fields() + .iter() + .find(|f| f.name() == col_name) + } else { + self.logical_file_schema + .fields() + .iter() + .find(|f| f.name().eq_ignore_ascii_case(col_name)) + }; + let physical_field = if self.parquet_options.case_sensitive { + self.physical_file_schema + .fields() + .iter() + .find(|f| f.name() == col_name) + } else { + self.physical_file_schema + .fields() + .iter() + .find(|f| f.name().eq_ignore_ascii_case(col_name)) + }; + + // Remap the column index to the physical file schema so + // downstream evaluation reads the correct column from the + // parquet batch. + let physical_index = if self.parquet_options.case_sensitive { + self.physical_file_schema.index_of(col_name).ok() + } else { + self.physical_file_schema + .fields() + .iter() + .position(|f| f.name().eq_ignore_ascii_case(col_name)) + }; + + if let (Some(logical_field), Some(physical_field), Some(phys_idx)) = + (logical_field, physical_field, physical_index) + { + let remapped: Arc = if column.index() != phys_idx { + Arc::new(Column::new(col_name, phys_idx)) + } else { + Arc::clone(&e) + }; + + if logical_field.data_type() != physical_field.data_type() { + // Mirror the same string/binary -> non-string/binary + // rejection in `handle_schema_mismatch_cast`; this + // branch is reached when the default adapter rejected + // the cast and we'd otherwise build a + // SparkCastColumnExpr that can't actually convert + // (e.g. BINARY -> DECIMAL with no + // `DecimalLogicalTypeAnnotation`). + let physical_type = physical_field.data_type(); + let target_type = logical_field.data_type(); + if is_string_or_binary(physical_type) + && !is_string_or_binary(target_type) + { + return Err(parquet_schema_convert_err( + physical_field.name(), + physical_type, + target_type, + )); + } + + let cast_expr: Arc = Arc::new( + SparkCastColumnExpr::new( + remapped, + Arc::clone(physical_field), + Arc::clone(logical_field), + None, + ) + .with_parquet_options(self.parquet_options.clone()), + ); + return Ok(Transformed::yes(cast_expr)); + } else if column.index() != phys_idx { + return Ok(Transformed::yes(remapped)); + } + } + } + Ok(Transformed::no(e)) + }) + .data() + } + + /// Handle a `CastExpr` produced by the default adapter (which wraps a + /// `Column` reference whose physical type differs from the logical type). + /// Apply Spark's rejection rules first, then either wrap with + /// [`SparkCastColumnExpr`] (for nested types) or leave as-is (for + /// primitive scalar casts). + fn handle_schema_mismatch_cast( + &self, + expr: Arc, + ) -> Result>> { + let Some(cast) = expr.downcast_ref::() else { + return Ok(Transformed::no(expr)); + }; + + // Only act on casts whose inner expression is a Column reference + // produced by the default adapter for schema-mismatch handling. + let Some(inner_column) = cast.expr().downcast_ref::() else { + return Ok(Transformed::no(Arc::clone(&expr))); + }; + + // Look up the physical field by the column name in the physical + // schema. This is the input to the cast. + let Ok(physical_field) = self + .physical_file_schema + .field_with_name(inner_column.name()) + else { + return Ok(Transformed::no(Arc::clone(&expr))); + }; + + let physical_type = physical_field.data_type(); + let target_field = cast.target_field(); + let target_type = target_field.data_type(); + let column_name = physical_field.name(); + + // Reject reading a string/binary Parquet column as anything else. + // Spark's `ParquetVectorUpdaterFactory.getUpdater` BINARY case allows + // StringType / BinaryType, or DecimalType only when the column carries + // a `DecimalLogicalTypeAnnotation` (which arrow-rs surfaces as + // `Decimal128`, not `Binary`). Without this guard, runtime cast paths + // silently return nulls, parse strings, or surface as a generic + // Arrow type-mismatch error. + if is_string_or_binary(physical_type) && !is_string_or_binary(target_type) { + return Err(parquet_schema_convert_err( + column_name, + physical_type, + target_type, + )); + } + + // Reject reading a primitive numeric Parquet column as StringType / + // BinaryType. Spark has no `int -> string` etc. updater. Defer to + // runtime via `RejectOnNonEmpty` so empty Parquet files (SPARK-26709) + // pass and the JVM shim translates to + // `SchemaColumnConvertNotSupportedException`. + let physical_is_primitive_numeric = matches!( + physical_type, + DataType::Boolean + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Float32 + | DataType::Float64 + ); + if physical_is_primitive_numeric && is_string_or_binary(target_type) { + let rejection = reject_on_non_empty_expr( + Arc::clone(cast.expr()), + target_field, + column_name, + physical_type, + target_type, + ); + return Ok(Transformed::yes(rejection)); + } + + // Decimal-to-decimal narrowing. Spark's `isDecimalTypeMatched` (the + // `DecimalLogicalTypeAnnotation` branch) allows the read only when + // `dst_scale >= src_scale` AND + // `dst_precision - dst_scale >= src_precision - src_scale`. + // Either failure means silently dropping fractional digits or losing + // integer-side magnitude. + if let (DataType::Decimal128(src_p, src_s), DataType::Decimal128(dst_p, dst_s)) = + (physical_type, target_type) + { + let src_int_precision = i32::from(*src_p) - i32::from(*src_s); + let dst_int_precision = i32::from(*dst_p) - i32::from(*dst_s); + if dst_s < src_s || dst_int_precision < src_int_precision { + return Err(parquet_schema_convert_err( + column_name, + physical_type, + target_type, + )); + } + } + + // Integer-to-decimal narrowing. Spark's `canReadAsDecimal` requires + // `precision - scale >= 10` for an INT32 source and `>= 20` for INT64. + // Unconditional in all Spark versions, so reject at plan time. + let int_decimal_min_int_precision = match physical_type { + DataType::Int8 | DataType::Int16 | DataType::Int32 => Some(10i32), + DataType::Int64 => Some(20i32), + _ => None, + }; + if let Some(min_int_precision) = int_decimal_min_int_precision { + let dst_precision_scale = match target_type { + DataType::Decimal128(p, s) | DataType::Decimal256(p, s) => Some((*p, *s)), + _ => None, + }; + if let Some((dst_p, dst_s)) = dst_precision_scale { + let dst_int_precision = i32::from(dst_p) - i32::from(dst_s); + if dst_int_precision < min_int_precision { + return Err(parquet_schema_convert_err( + column_name, + physical_type, + target_type, + )); + } + } + } + + // Type promotion (widening). When `allow_type_promotion` is false, + // reject the three widenings (INT32→INT64, FLOAT→DOUBLE, INT32→DOUBLE) + // that Spark 3.x's vectorized reader rejects. The flag tracks Spark's + // per-version vectorized-reader policy. Deferred to runtime so empty + // files (SPARK-26709) pass. + if !self.parquet_options.allow_type_promotion { + let is_disallowed_promotion = matches!( + (physical_type, target_type), + (DataType::Int32, DataType::Int64) + | (DataType::Float32, DataType::Float64) + | (DataType::Int32, DataType::Float64) + ); + if is_disallowed_promotion { + let rejection = reject_on_non_empty_expr( + Arc::clone(cast.expr()), + target_field, + column_name, + physical_type, + target_type, + ); + return Ok(Transformed::yes(rejection)); + } + } + + // Reject primitive Parquet conversions Spark's vectorized reader + // rejects on every supported version (no matching branch in + // `ParquetVectorUpdaterFactory.getUpdater`): + // + // - `INT64 -> Int*` truncates lower bits. + // - `INT64 -> Float*` and `INT32 -> Float32` lose precision. + // - `Float* -> Int*` and `Float64 -> Float32` truncate / overflow. + // - `INT32 -> Timestamp` / `INT64 -> Date32` / `INT64 -> Timestamp`: + // date/timestamp-annotated columns surface as Date32 / Timestamp, + // so reaching this branch means the column was un-annotated. + // - `Date32 -> Timestamp(LTZ)`: Spark only allows + // Date -> TimestampNTZ. + // - `Timestamp -> Date32`: no Timestamp updater branches into Date. + // + // Deferred to runtime (SPARK-26709). + let is_spark_rejected_conversion = matches!( + (physical_type, target_type), + // Long -> narrower int. + ( + DataType::Int64, + DataType::Int8 | DataType::Int16 | DataType::Int32, + ) + // Long -> floating point. + | (DataType::Int64, DataType::Float32 | DataType::Float64) + // Long -> date / timestamp (raw INT64; annotated columns + // surface as Date32/Timestamp). + | (DataType::Int64, DataType::Date32) + | (DataType::Int64, DataType::Timestamp(_, _)) + // Int -> float (DoubleType is allowed via + // IntegerToDoubleUpdater; FloatType is not). + | ( + DataType::Int8 | DataType::Int16 | DataType::Int32, + DataType::Float32, + ) + // Int -> timestamp (raw INT32; DATE-annotated columns surface + // as Date32). + | ( + DataType::Int8 | DataType::Int16 | DataType::Int32, + DataType::Timestamp(_, _), + ) + // Float -> int / Double -> int (no integer branches under + // FLOAT/DOUBLE). + | ( + DataType::Float32 | DataType::Float64, + DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64, + ) + // Double -> float (narrowing). + | (DataType::Float64, DataType::Float32) + // Date -> Timestamp(LTZ). Spark allows Date -> TimestampNTZ + // only. + | (DataType::Date32, DataType::Timestamp(_, Some(_))) + // Timestamp -> Date. + | (DataType::Timestamp(_, _), DataType::Date32) + ); + if is_spark_rejected_conversion { + let rejection = reject_on_non_empty_expr( + Arc::clone(cast.expr()), + target_field, + column_name, + physical_type, + target_type, + ); + return Ok(Transformed::yes(rejection)); + } + + // Scalar/complex mismatch (e.g. TIMESTAMP read as ARRAY): + // Spark's vectorized reader rejects with + // SchemaColumnConvertNotSupportedException (SPARK-45604). Same-shape + // complex pairs and timestamp→timestamp / timestamp→int64 fall through + // to SparkCastColumnExpr below. + let is_complex = |t: &DataType| { + matches!( + t, + DataType::Struct(_) | DataType::List(_) | DataType::Map(_, _) + ) + }; + if is_complex(physical_type) != is_complex(target_type) { + return Err(parquet_schema_convert_err( + column_name, + physical_type, + target_type, + )); + } + + // Same-shape complex casts, timestamp tz relabel (e.g. + // Timestamp(us, None) -> Timestamp(us, Some("UTC")) for INT96 reads), + // and Timestamp -> Int64 (Spark's `nanosAsLong`) need + // `spark_parquet_convert`: it handles nested field selection, + // metadata-only tz changes, and raw-value reinterpretation that + // Spark's Cast would otherwise convert incorrectly. + if matches!( + (physical_type, target_type), + (DataType::Struct(_), DataType::Struct(_)) + | (DataType::List(_), DataType::List(_)) + | (DataType::Map(_, _), DataType::Map(_, _)) + | (DataType::Timestamp(_, _), DataType::Timestamp(_, _)) + | (DataType::Timestamp(_, _), DataType::Int64) + ) { + let spark_cast: Arc = Arc::new( + SparkCastColumnExpr::new( + Arc::clone(cast.expr()), + Arc::new(physical_field.clone()), + Arc::clone(target_field), + None, + ) + .with_parquet_options(self.parquet_options.clone()), + ); + return Ok(Transformed::yes(spark_cast)); + } + + // For simple scalar type casts, leave the default-adapter-produced + // CastExpr in place. Future work can add a Spark-specific Cast + // PhysicalExpr to handle ANSI/Legacy mode differences for primitive + // casts; today DataFusion's CastExpr is used for those. + Ok(Transformed::no(expr)) + } + + /// Replace references to missing columns with default values. + fn replace_missing_with_defaults( + &self, + expr: Arc, + ) -> Result> { + let Some(defaults) = &self.default_values else { + return Ok(expr); + }; + + if defaults.is_empty() { + return Ok(expr); + } + + // Build owned (column_name, default_value) pairs for columns missing + // from the physical file. For each default: filter to only columns + // absent from the physical schema, then type-cast the value to match + // the logical schema's field type if they differ (using Spark cast + // semantics). + let missing_column_defaults: Vec<(String, ScalarValue)> = defaults + .iter() + .filter_map(|(col, val)| { + let col_name = col.name(); + + // Only include defaults for columns missing from the physical + // file schema. + let is_missing = if self.parquet_options.case_sensitive { + self.physical_file_schema.field_with_name(col_name).is_err() + } else { + !self + .physical_file_schema + .fields() + .iter() + .any(|f| f.name().eq_ignore_ascii_case(col_name)) + }; + + if !is_missing { + return None; + } + + // Cast value to logical schema type if needed (only if types + // differ). + let value = self + .logical_file_schema + .field_with_name(col_name) + .ok() + .filter(|field| val.data_type() != *field.data_type()) + .and_then(|field| { + super::parquet_support::spark_parquet_convert( + ColumnarValue::Scalar(val.clone()), + field.data_type(), + &self.parquet_options, + ) + .ok() + .and_then(|cv| match cv { + ColumnarValue::Scalar(s) => Some(s), + _ => None, + }) + }) + .unwrap_or_else(|| val.clone()); + + Some((col_name.to_string(), value)) + }) + .collect(); + + let name_based: HashMap<&str, &ScalarValue> = missing_column_defaults + .iter() + .map(|(k, v)| (k.as_str(), v)) + .collect(); + + if name_based.is_empty() { + return Ok(expr); + } + + replace_columns_with_literals(expr, &name_based) + } +} + +/// Defers a Parquet type-promotion rejection to runtime: returns an empty +/// array when the input batch has no rows, and raises +/// [`ParquetSchemaError::SchemaConvert`] otherwise. +/// +/// Mirrors Spark's vectorized reader, which only invokes +/// `ParquetVectorUpdaterFactory.getUpdater` while decoding a row group. A +/// Parquet file with no row groups (e.g. one written from an empty DataFrame) +/// never triggers the per-row-group check, so a partition mixing such a file +/// with another whose schema would otherwise fail the type-promotion check +/// (SPARK-26709) is still readable. +#[derive(Debug, Eq)] +struct RejectOnNonEmpty { + child: Arc, + target_field: FieldRef, + column: String, + physical_type: String, + spark_type: String, +} + +impl PartialEq for RejectOnNonEmpty { + fn eq(&self, other: &Self) -> bool { + self.child.eq(&other.child) + && self.target_field.eq(&other.target_field) + && self.column == other.column + && self.physical_type == other.physical_type + && self.spark_type == other.spark_type + } +} + +impl Hash for RejectOnNonEmpty { + fn hash(&self, state: &mut H) { + self.child.hash(state); + self.target_field.hash(state); + self.column.hash(state); + self.physical_type.hash(state); + self.spark_type.hash(state); + } +} + +impl Display for RejectOnNonEmpty { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "REJECT_PARQUET_TYPE_PROMOTION({} AS {})", + self.column, self.spark_type + ) + } +} + +impl PhysicalExpr for RejectOnNonEmpty { + fn data_type(&self, _input_schema: &Schema) -> Result { + Ok(self.target_field.data_type().clone()) + } + + fn nullable(&self, _input_schema: &Schema) -> Result { + Ok(self.target_field.is_nullable()) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + if batch.num_rows() == 0 { + return Ok(ColumnarValue::Array(new_empty_array( + self.target_field.data_type(), + ))); + } + Err(ParquetSchemaError::SchemaConvert { + file_path: String::new(), + column: self.column.clone(), + physical_type: self.physical_type.clone(), + spark_type: self.spark_type.clone(), + } + .into()) + } + + fn return_field(&self, _input_schema: &Schema) -> Result { + Ok(Arc::clone(&self.target_field)) + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + assert_eq!(children.len(), 1); + Ok(Arc::new(RejectOnNonEmpty { + child: children.pop().expect("child"), + target_field: Arc::clone(&self.target_field), + column: self.column.clone(), + physical_type: self.physical_type.clone(), + spark_type: self.spark_type.clone(), + })) + } + + fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Display::fmt(self, f) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::fs::{File, create_dir_all}; + use std::path::PathBuf; + + use arrow::array::{ + Array, BinaryArray, Date32Array, Decimal128Array, Float32Array, Float64Array, + Int32Array, Int64Array, StringArray, TimestampMicrosecondArray, UInt32Array, + }; + use arrow::datatypes::{Field, Schema, SchemaRef, TimeUnit}; + use arrow::record_batch::RecordBatch; + use datafusion::datasource::listing::PartitionedFile; + use datafusion::datasource::physical_plan::{ + FileGroup, FileScanConfigBuilder, ParquetSource, + }; + use datafusion::datasource::source::DataSourceExec; + use datafusion::execution::TaskContext; + use datafusion::execution::object_store::ObjectStoreUrl; + use datafusion::physical_plan::ExecutionPlan; + use futures::StreamExt; + use parquet::arrow::ArrowWriter; + + use crate::parquet::options::EvalMode; + + fn temp_parquet_path() -> PathBuf { + let mut path = std::env::temp_dir(); + path.push("datafusion-spark-tests"); + create_dir_all(&path).unwrap(); + path.push(format!("schema-adapter-{}.parquet", rand::random::())); + path + } + + /// Create a Parquet file containing a single batch and read it back using + /// the specified `required_schema` plus a [`SparkPhysicalExprAdapterFactory`]. + /// Returns the first batch read from the stream (or the first error). + async fn roundtrip( + batch: &RecordBatch, + required_schema: SchemaRef, + ) -> Result { + let mut options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); + options.allow_cast_unsigned_ints = true; + roundtrip_with_options(batch, required_schema, options).await + } + + async fn roundtrip_with_options( + batch: &RecordBatch, + required_schema: SchemaRef, + options: SparkParquetOptions, + ) -> Result { + let path = temp_parquet_path(); + let path_str = path.to_str().unwrap().to_string(); + let file = File::create(&path).unwrap(); + let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap(); + writer.write(batch).unwrap(); + writer.close().unwrap(); + + let factory: Arc = + Arc::new(SparkPhysicalExprAdapterFactory::new(options, None)); + + let parquet_source = ParquetSource::new(required_schema); + let files = FileGroup::new(vec![PartitionedFile::from_path(path_str)?]); + let file_scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + Arc::new(parquet_source), + ) + .with_file_groups(vec![files]) + .with_expr_adapter(Some(factory)) + .build(); + + let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); + let mut stream = parquet_exec.execute(0, Arc::new(TaskContext::default()))?; + stream.next().await.unwrap() + } + + /// Helper for the type-conversion rejection tests: write a 1-row batch and + /// assert that reading it under `read_type` raises `ParquetSchemaConvert`. + async fn assert_rejected_conversion( + file_field: Field, + values: Arc, + read_type: DataType, + ) -> Result { + let file_schema = Arc::new(Schema::new(vec![file_field.clone()])); + let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![values])?; + let read_field_name = file_schema.field(0).name(); + let required_schema = Arc::new(Schema::new(vec![Field::new( + read_field_name, + read_type, + false, + )])); + let err = roundtrip(&batch, required_schema) + .await + .expect_err("expected ParquetSchemaConvert"); + Ok(err.to_string()) + } + + /// Reading a non-BINARY Parquet column as `StringType` must raise the same + /// `_LEGACY_ERROR_TEMP_2063`-shaped error as Spark's vectorized reader. + #[tokio::test] + async fn parquet_int_read_as_string_errors() -> Result<()> { + let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int32, false), + values, + DataType::Utf8, + ) + .await?; + assert!( + msg.contains("Column: [[a]]") + && msg.contains("Expected: string") + && msg.contains("Found: INT32"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// BINARY (string physical) read as IntegerType must raise the + /// Spark-compatible error. + #[tokio::test] + async fn parquet_string_read_as_int_errors() -> Result<()> { + let values = Arc::new(StringArray::from(vec!["bcd", "efg"])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Utf8, false), + values, + DataType::Int32, + ) + .await?; + assert!( + msg.contains("Column: [[a]]") + && msg.contains("Expected: int") + && msg.contains("Found: BINARY"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// Reading a plain BINARY Parquet column as `DecimalType` must raise a + /// `ParquetSchemaConvert`-shaped error. + #[tokio::test] + async fn parquet_binary_read_as_decimal_errors() -> Result<()> { + let values = + Arc::new(BinaryArray::from_vec(vec![b"1.2", b"3.4"])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Binary, false), + values, + DataType::Decimal128(37, 1), + ) + .await?; + assert!( + msg.contains("Column: [[a]]") + && msg.contains("Expected: decimal(37,1)") + && msg.contains("Found: BINARY"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// INT32 -> Decimal where `precision - scale < 10` (the minimum that can + /// represent the full INT32 range). + #[tokio::test] + async fn parquet_int32_read_as_narrow_decimal_errors() -> Result<()> { + let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int32, false), + values, + DataType::Decimal128(9, 0), + ) + .await?; + assert!( + msg.contains("Column: [[a]]") + && msg.contains("Expected: decimal") + && msg.contains("Found: INT32"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// INT32 -> Decimal where `precision - scale >= 10` is allowed. + #[tokio::test] + async fn parquet_int32_read_as_wide_decimal_succeeds() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![values])?; + let required_schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Decimal128(10, 0), + false, + )])); + let _ = roundtrip(&batch, required_schema).await?; + Ok(()) + } + + /// INT64 -> Decimal where `precision - scale < 20`. + #[tokio::test] + async fn parquet_int64_read_as_narrow_decimal_errors() -> Result<()> { + let values = Arc::new(Int64Array::from(vec![1i64, 2, 3])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int64, false), + values, + DataType::Decimal128(19, 0), + ) + .await?; + assert!( + msg.contains("Column: [[a]]") + && msg.contains("Expected: decimal") + && msg.contains("Found: INT64"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// Reading Decimal(P, S) as Decimal(P', S) where P' < P must raise the + /// Spark-compatible error. + #[tokio::test] + async fn parquet_decimal_precision_narrowing_errors() -> Result<()> { + let batch = decimal_batch(10, 2)?; + let required_schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Decimal128(5, 2), + false, + )])); + + let err = roundtrip(&batch, required_schema).await.expect_err( + "expected ParquetSchemaConvert for Decimal(10, 2) -> Decimal(5, 2)", + ); + let msg = err.to_string(); + assert!( + msg.contains("Column: [[a]]") && msg.contains("Expected: decimal(5,2)"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// Reading Decimal(P, S) as Decimal(P', S') where the integer-precision + /// `P - S` shrinks must raise the Spark-compatible error. + #[tokio::test] + async fn parquet_decimal_int_precision_narrowing_errors() -> Result<()> { + let batch = decimal_batch(10, 4)?; + let required_schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Decimal128(5, 2), + false, + )])); + + let err = roundtrip(&batch, required_schema).await.expect_err( + "expected ParquetSchemaConvert for Decimal(10, 4) -> Decimal(5, 2)", + ); + let msg = err.to_string(); + assert!(msg.contains("Column: [[a]]"), "unexpected error: {msg}"); + Ok(()) + } + + /// Sanity check: widening both precision and scale by the same amount is + /// allowed (the cast is lossless). + #[tokio::test] + async fn parquet_decimal_widening_succeeds() -> Result<()> { + let batch = decimal_batch(5, 2)?; + let required_schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Decimal128(7, 4), + false, + )])); + let _ = roundtrip(&batch, required_schema).await?; + Ok(()) + } + + fn decimal_batch(precision: u8, scale: i8) -> Result { + let file_schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Decimal128(precision, scale), + false, + )])); + let values = Arc::new( + Decimal128Array::from(vec![123i128, 456]) + .with_precision_and_scale(precision, scale) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?, + ) as Arc; + Ok(RecordBatch::try_new(file_schema, vec![values])?) + } + + /// `INT64 -> INT32` truncates to the lower 32 bits in DataFusion's cast. + /// Spark's vectorized reader rejects this. + #[tokio::test] + async fn parquet_long_read_as_int_errors() -> Result<()> { + let values = Arc::new(Int64Array::from(vec![1i64, 1 << 33])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int64, false), + values, + DataType::Int32, + ) + .await?; + assert!( + msg.contains("Found: INT64") && msg.contains("Expected: int"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `INT64 -> Float64` loses precision for large values; Spark rejects. + #[tokio::test] + async fn parquet_long_read_as_double_errors() -> Result<()> { + let values = + Arc::new(Int64Array::from(vec![1i64, (1i64 << 54) + 1])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int64, false), + values, + DataType::Float64, + ) + .await?; + assert!( + msg.contains("Found: INT64") && msg.contains("Expected: double"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Float64 -> Float32` overflows / loses precision; Spark rejects. + #[tokio::test] + async fn parquet_double_read_as_float_errors() -> Result<()> { + let values = Arc::new(Float64Array::from(vec![1.5_f64, 1e40])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Float64, false), + values, + DataType::Float32, + ) + .await?; + assert!( + msg.contains("Found: DOUBLE") && msg.contains("Expected: float"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Float32 -> Int64` truncates the fractional part; Spark rejects. + #[tokio::test] + async fn parquet_float_read_as_long_errors() -> Result<()> { + let values = Arc::new(Float32Array::from(vec![1.5_f32, 2.5])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Float32, false), + values, + DataType::Int64, + ) + .await?; + assert!( + msg.contains("Found: FLOAT") && msg.contains("Expected: bigint"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Int32 -> Float32` loses precision for values past `2^24`; Spark rejects. + #[tokio::test] + async fn parquet_int_read_as_float_errors() -> Result<()> { + let values = Arc::new(Int32Array::from(vec![1, (1 << 25) + 1])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int32, false), + values, + DataType::Float32, + ) + .await?; + assert!( + msg.contains("Found: INT32") && msg.contains("Expected: float"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Int32 -> Timestamp(_, None)`: raw INT32 reinterpreted as epoch seconds + /// produces dates near the Unix epoch. Only DATE-annotated INT32 columns + /// (which surface as `Date32`) are allowed to read as `TimestampNTZ`. + #[tokio::test] + async fn parquet_int_read_as_timestamp_ntz_errors() -> Result<()> { + let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int32, false), + values, + DataType::Timestamp(TimeUnit::Microsecond, None), + ) + .await?; + assert!( + msg.contains("Found: INT32") && msg.contains("Expected: timestamp"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Int64 -> Date32`: raw INT64 (no DATE annotation, otherwise the file + /// would surface as `Date32`). + #[tokio::test] + async fn parquet_long_read_as_date_errors() -> Result<()> { + let values = Arc::new(Int64Array::from(vec![1i64, 2])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Int64, false), + values, + DataType::Date32, + ) + .await?; + assert!( + msg.contains("Found: INT64") && msg.contains("Expected: date"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Date32 -> Timestamp(_, Some(_))` (LTZ). Spark allows + /// `Date -> TimestampNTZ` only. + #[tokio::test] + async fn parquet_date_read_as_ltz_timestamp_errors() -> Result<()> { + let values = Arc::new(Date32Array::from(vec![18262, 18263])) as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Date32, false), + values, + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + ) + .await?; + assert!( + msg.contains("Found: INT32") && msg.contains("Expected: timestamp"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// `Timestamp(_, _) -> Date32`: no Timestamp updater branches into + /// `DateType` in Spark. + #[tokio::test] + async fn parquet_timestamp_read_as_date_errors() -> Result<()> { + let values = Arc::new(TimestampMicrosecondArray::from(vec![0i64, 1_000_000])) + as Arc; + let msg = assert_rejected_conversion( + Field::new("a", DataType::Timestamp(TimeUnit::Microsecond, None), false), + values, + DataType::Date32, + ) + .await?; + assert!(msg.contains("Expected: date"), "unexpected error: {msg}"); + Ok(()) + } + + /// SPARK-26709: an empty Parquet file with a column that would otherwise + /// fail the type-promotion check (INT32 read as INT64 when + /// `allow_type_promotion` is false) must still be readable. Spark's + /// vectorized reader only enforces the check per row group, so a file + /// with no row groups passes silently. + #[tokio::test] + async fn parquet_empty_file_disallowed_widening() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, false)])); + let path = temp_parquet_path(); + let path_str = path.to_str().unwrap().to_string(); + let file = File::create(&path)?; + let writer = ArrowWriter::try_new(file, Arc::clone(&file_schema), None).unwrap(); + writer.close().unwrap(); + + let required_schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)])); + + let mut options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); + options.allow_type_promotion = false; + + let factory: Arc = + Arc::new(SparkPhysicalExprAdapterFactory::new(options, None)); + + let parquet_source = ParquetSource::new(required_schema); + let files = FileGroup::new(vec![PartitionedFile::from_path(path_str)?]); + let file_scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + Arc::new(parquet_source), + ) + .with_file_groups(vec![files]) + .with_expr_adapter(Some(factory)) + .build(); + + let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); + let mut stream = parquet_exec.execute(0, Arc::new(TaskContext::default()))?; + while let Some(batch) = stream.next().await { + let batch = batch?; + assert_eq!(batch.num_rows(), 0); + } + Ok(()) + } + + /// Companion: a non-empty file with the same widening must raise + /// `ParquetSchemaConvert` at runtime (deferred from plan time). + #[tokio::test] + async fn parquet_non_empty_file_disallowed_widening_errors() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, false)])); + let values = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![values])?; + + let path = temp_parquet_path(); + let path_str = path.to_str().unwrap().to_string(); + let file = File::create(&path)?; + let mut writer = + ArrowWriter::try_new(file, Arc::clone(&file_schema), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let required_schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)])); + + let mut options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); + options.allow_type_promotion = false; + + let factory: Arc = + Arc::new(SparkPhysicalExprAdapterFactory::new(options, None)); + + let parquet_source = ParquetSource::new(required_schema); + let files = FileGroup::new(vec![PartitionedFile::from_path(path_str)?]); + let file_scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + Arc::new(parquet_source), + ) + .with_file_groups(vec![files]) + .with_expr_adapter(Some(factory)) + .build(); + + let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); + let mut stream = parquet_exec.execute(0, Arc::new(TaskContext::default()))?; + let first = stream.next().await.unwrap(); + let err = + first.expect_err("expected ParquetSchemaConvert error on non-empty file"); + let msg = err.to_string(); + assert!( + msg.contains("Column: [[col]]") + && msg.contains("Expected: bigint") + && msg.contains("Found: INT32"), + "unexpected error: {msg}" + ); + Ok(()) + } + + /// Roundtrip an unsigned integer column read as a signed integer of the + /// same width (Iceberg / Arrow files commonly use unsigned types where + /// Spark expects signed). + #[tokio::test] + async fn parquet_roundtrip_unsigned_int() -> Result<()> { + let file_schema = + Arc::new(Schema::new(vec![Field::new("id", DataType::UInt32, false)])); + + let ids = Arc::new(UInt32Array::from(vec![1, 2, 3])) as Arc; + let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![ids])?; + + let required_schema = + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + let _ = roundtrip(&batch, required_schema).await?; + Ok(()) + } + + /// Reading `b` from a file that contains `A`, `B`, and `b` in + /// case-insensitive mode must raise the `_LEGACY_ERROR_TEMP_2093`-shaped + /// duplicate-field error. + #[tokio::test] + async fn parquet_duplicate_fields_case_insensitive() -> Result<()> { + let file_schema = Arc::new(Schema::new(vec![ + Field::new("A", DataType::Int32, false), + Field::new("B", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let col_a = Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc; + let col_b1 = Arc::new(Int32Array::from(vec![4, 5, 6])) as Arc; + let col_b2 = Arc::new(Int32Array::from(vec![7, 8, 9])) as Arc; + let batch = + RecordBatch::try_new(Arc::clone(&file_schema), vec![col_a, col_b1, col_b2])?; + + let path = temp_parquet_path(); + let path_str = path.to_str().unwrap().to_string(); + let file = File::create(&path)?; + let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let required_schema = + Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, false)])); + + let mut options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); + options.case_sensitive = false; + + let factory: Arc = + Arc::new(SparkPhysicalExprAdapterFactory::new(options, None)); + + let parquet_source = ParquetSource::new(required_schema); + let files = FileGroup::new(vec![PartitionedFile::from_path(path_str)?]); + let file_scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + Arc::new(parquet_source), + ) + .with_file_groups(vec![files]) + .with_expr_adapter(Some(factory)) + .build(); + + let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); + let mut stream = parquet_exec.execute(0, Arc::new(TaskContext::default()))?; + let result = stream.next().await.unwrap(); + + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Found duplicate field"), + "expected duplicate field error, got: {err_msg}" + ); + Ok(()) + } +} From 6aa9925a8a3a2353ba5c6ba4be740a9087a76d07 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 18 May 2026 11:22:13 -0600 Subject: [PATCH 2/2] refactor(spark/parquet): code review cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Short-circuit duplicate-field tree walk via Err propagation. - Cache `types_match` on `SparkCastColumnExpr` to skip per-batch DataType compare. - Drop redundant `Option` for `original_physical_schema` (gate on `case_sensitive` directly). - Pre-build case-folded HashSet of physical names for O(1) `is_missing` lookup in `replace_missing_with_defaults` (was O(d × n)). - Replace `O(n × m)` `eq_ignore_ascii_case` scan in `remap_physical_schema` with `HashSet` of pre-lowercased names. - Drop micros→millis specialization in `SparkCastColumnExpr::evaluate`; arrow's cast handles it. - Dedupe `parse_field_id`/`field_id` between `schema_adapter.rs` and `parquet_support.rs`. - Drop unnecessary `array.data_type().clone()` in `parquet_convert_array`. - Reduce `remap_physical_schema` parameter list (3 bools → `&SparkParquetOptions`). - Extract `find_field` and `rename_field` helpers; collapse 3x case-sensitive lookup duplication and 2x field-rename block duplication. - Reduce test boilerplate: extract `execute_with_factory` helper used by all 3 odd-out tests; `temp_parquet_path()` returns `String` directly. Co-Authored-By: Claude Opus 4.7 --- datafusion/spark/src/parquet/cast_column.rs | 132 ++------ .../spark/src/parquet/parquet_support.rs | 5 +- .../spark/src/parquet/schema_adapter.rs | 306 +++++++----------- 3 files changed, 145 insertions(+), 298 deletions(-) diff --git a/datafusion/spark/src/parquet/cast_column.rs b/datafusion/spark/src/parquet/cast_column.rs index eb11deb63e82c..f1245a62b9308 100644 --- a/datafusion/spark/src/parquet/cast_column.rs +++ b/datafusion/spark/src/parquet/cast_column.rs @@ -25,14 +25,13 @@ use std::hash::Hash; use std::sync::Arc; use arrow::array::{ - Array, ArrayRef, LargeListArray, ListArray, MapArray, StructArray, - TimestampMicrosecondArray, TimestampMillisecondArray, make_array, + Array, ArrayRef, LargeListArray, ListArray, MapArray, StructArray, make_array, }; use arrow::compute::CastOptions; -use arrow::datatypes::{DataType, FieldRef, Schema, TimeUnit}; +use arrow::datatypes::{DataType, FieldRef, Schema}; use arrow::record_batch::RecordBatch; +use datafusion_common::Result; use datafusion_common::format::DEFAULT_CAST_OPTIONS; -use datafusion_common::{Result, ScalarValue}; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; @@ -151,39 +150,6 @@ fn relabel_array(array: ArrayRef, target_type: &DataType) -> ArrayRef { } } -/// Casts a `Timestamp(Microsecond)` array to `Timestamp(Millisecond)` by -/// dividing values by 1000. Preserves the timezone from the target type. -fn cast_timestamp_micros_to_millis_array( - array: &ArrayRef, - target_tz: Option>, -) -> ArrayRef { - let micros_array = array - .as_any() - .downcast_ref::() - .expect("Expected TimestampMicrosecondArray"); - - let millis_values: TimestampMillisecondArray = - arrow::compute::kernels::arity::unary(micros_array, |v| v / 1000); - - let result = if let Some(tz) = target_tz { - millis_values.with_timezone(tz) - } else { - millis_values - }; - - Arc::new(result) -} - -/// Casts a `Timestamp(Microsecond)` scalar to `Timestamp(Millisecond)` by -/// dividing the value by 1000. Preserves the timezone from the target type. -fn cast_timestamp_micros_to_millis_scalar( - opt_val: Option, - target_tz: Option>, -) -> ScalarValue { - let new_val = opt_val.map(|v| v / 1000); - ScalarValue::TimestampMillisecond(new_val, target_tz) -} - /// A column-level cast that adapts a Parquet column to its requested type /// using Spark semantics for nested types and a handful of primitive cases /// that Arrow's cast does not handle correctly for Spark (e.g. timestamp @@ -202,6 +168,12 @@ pub struct SparkCastColumnExpr { /// Spark parquet options for complex nested type conversions. /// When present, enables [`spark_parquet_convert`] as a fallback. parquet_options: Option, + /// Pre-computed `input_physical_field.data_type() == target_field.data_type()`. + /// Skips the per-batch recursive `DataType` comparison in [`Self::evaluate`] + /// for the common pass-through case (the input is structurally identical + /// to the target and only needs a metadata relabel — but the relabel + /// itself is detected separately by [`types_differ_only_in_field_names`]). + types_match: bool, } // Manually derive `PartialEq`/`Hash` because `Arc` does not @@ -234,12 +206,14 @@ impl SparkCastColumnExpr { target_field: FieldRef, cast_options: Option>, ) -> Self { + let types_match = physical_field.data_type() == target_field.data_type(); Self { expr, input_physical_field: physical_field, target_field, cast_options: cast_options.unwrap_or(DEFAULT_CAST_OPTIONS), parquet_options: None, + types_match, } } @@ -273,12 +247,7 @@ impl PhysicalExpr for SparkCastColumnExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { let value = self.expr.evaluate(batch)?; - // Use `==` (PartialEq) instead of `equals_datatype` because - // `equals_datatype` ignores field names in nested types (Struct, - // List, Map). We need to detect when field names differ (e.g., - // Struct("a","b") vs Struct("c","d")) so that we can apply - // `spark_parquet_convert` for field-name-based selection. - if value.data_type() == *self.target_field.data_type() { + if self.types_match { return Ok(value); } @@ -286,25 +255,6 @@ impl PhysicalExpr for SparkCastColumnExpr { let target_field = self.target_field.data_type(); match (input_physical_field, target_field) { - // Timestamp(Microsecond) -> Timestamp(Millisecond) - ( - DataType::Timestamp(TimeUnit::Microsecond, _), - DataType::Timestamp(TimeUnit::Millisecond, target_tz), - ) => match value { - ColumnarValue::Array(array) => { - let casted = - cast_timestamp_micros_to_millis_array(&array, target_tz.clone()); - Ok(ColumnarValue::Array(casted)) - } - ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(opt_val, _)) => { - let casted = cast_timestamp_micros_to_millis_scalar( - opt_val, - target_tz.clone(), - ); - Ok(ColumnarValue::Scalar(casted)) - } - _ => Ok(value), - }, // Nested types that differ only in field names (e.g., List // element named "item" vs "element", or Map entries named // "key_value" vs "entries"). Re-label the array so the DataType @@ -370,54 +320,13 @@ impl PhysicalExpr for SparkCastColumnExpr { #[cfg(test)] mod tests { use super::*; - use arrow::array::{Int32Array, StringArray}; + use arrow::array::{ + Int32Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, + }; use arrow::buffer::OffsetBuffer; - use arrow::datatypes::{Field, Fields}; + use arrow::datatypes::{Field, Fields, TimeUnit}; use datafusion_physical_expr::expressions::Column; - #[test] - fn test_cast_timestamp_micros_to_millis_array() { - let micros_array: TimestampMicrosecondArray = vec![ - Some(1_000_000), - Some(2_500_000), - None, - Some(0), - Some(-1_000_000), - ] - .into(); - let array_ref: ArrayRef = Arc::new(micros_array); - - let result = cast_timestamp_micros_to_millis_array(&array_ref, None); - let millis_array = result - .as_any() - .downcast_ref::() - .expect("Expected TimestampMillisecondArray"); - - assert_eq!(millis_array.len(), 5); - assert_eq!(millis_array.value(0), 1000); - assert_eq!(millis_array.value(1), 2500); - assert!(millis_array.is_null(2)); - assert_eq!(millis_array.value(3), 0); - assert_eq!(millis_array.value(4), -1000); - } - - #[test] - fn test_cast_timestamp_micros_to_millis_scalar() { - let result = cast_timestamp_micros_to_millis_scalar(Some(1_500_000), None); - assert_eq!(result, ScalarValue::TimestampMillisecond(Some(1500), None)); - - let null_result = cast_timestamp_micros_to_millis_scalar(None, None); - assert_eq!(null_result, ScalarValue::TimestampMillisecond(None, None)); - - let target_tz: Option> = Some(Arc::from("UTC")); - let tz_result = - cast_timestamp_micros_to_millis_scalar(Some(2_000_000), target_tz.clone()); - assert_eq!( - tz_result, - ScalarValue::TimestampMillisecond(Some(2000), target_tz) - ); - } - #[test] fn test_relabel_list_field_name() { // Physical: List(Field("item", Int32)) @@ -479,6 +388,8 @@ mod tests { #[test] fn test_evaluate_micros_to_millis_array() { + use crate::parquet::options::EvalMode; + let input_field = Arc::new(Field::new( "ts", DataType::Timestamp(TimeUnit::Microsecond, None), @@ -495,7 +406,12 @@ mod tests { let col_expr: Arc = Arc::new(Column::new("ts", 0)); let cast_expr = - SparkCastColumnExpr::new(col_expr, input_field, target_field, None); + SparkCastColumnExpr::new(col_expr, input_field, target_field, None) + .with_parquet_options(SparkParquetOptions::new( + EvalMode::Legacy, + "UTC", + false, + )); let micros_array: TimestampMicrosecondArray = vec![Some(1_000_000), Some(2_000_000), None].into(); diff --git a/datafusion/spark/src/parquet/parquet_support.rs b/datafusion/spark/src/parquet/parquet_support.rs index 7b81f2de4a773..e0c3aa0d3db21 100644 --- a/datafusion/spark/src/parquet/parquet_support.rs +++ b/datafusion/spark/src/parquet/parquet_support.rs @@ -85,11 +85,10 @@ fn parquet_convert_array( parquet_options: &SparkParquetOptions, ) -> Result { use DataType::*; - let from_type = array.data_type().clone(); // Strip dictionary encoding before converting; the result is rebuilt as a // dictionary if the target type is itself dictionary-encoded. - let array = match &from_type { + let array = match array.data_type() { Dictionary(key_type, value_type) if key_type.as_ref() == &Int32 && (value_type.as_ref() == &Utf8 @@ -195,7 +194,7 @@ fn parquet_convert_array( } /// Read the Parquet field id from arrow-rs's `PARQUET_FIELD_ID_META_KEY`. -fn field_id(field: &Field) -> Option { +pub(super) fn field_id(field: &Field) -> Option { field .metadata() .get(PARQUET_FIELD_ID_META_KEY) diff --git a/datafusion/spark/src/parquet/schema_adapter.rs b/datafusion/spark/src/parquet/schema_adapter.rs index 43fbc8dae85bf..290a8672c289a 100644 --- a/datafusion/spark/src/parquet/schema_adapter.rs +++ b/datafusion/spark/src/parquet/schema_adapter.rs @@ -53,11 +53,11 @@ use datafusion_physical_expr_adapter::{ replace_columns_with_literals, }; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use super::cast_column::SparkCastColumnExpr; use super::error::ParquetSchemaError; use super::options::SparkParquetOptions; +use super::parquet_support::field_id as parse_field_id; /// Factory that creates Spark-compatible [`PhysicalExprAdapter`] instances. /// @@ -86,19 +86,42 @@ impl SparkPhysicalExprAdapterFactory { } } -/// Read the Parquet field id stored under arrow-rs's -/// `PARQUET_FIELD_ID_META_KEY`. -fn parse_field_id(field: &Field) -> Option { - field - .metadata() - .get(PARQUET_FIELD_ID_META_KEY) - .and_then(|v| v.parse::().ok()) -} - fn schema_has_field_ids(schema: &SchemaRef) -> bool { schema.fields().iter().any(|f| parse_field_id(f).is_some()) } +/// Look up a field in `schema` by name. Honors `case_sensitive`: when false, +/// matches case-insensitively. Returns `(index, field)` for use sites that need +/// both. +fn find_field<'a>( + schema: &'a SchemaRef, + name: &str, + case_sensitive: bool, +) -> Option<(usize, &'a FieldRef)> { + if case_sensitive { + schema + .index_of(name) + .ok() + .map(|idx| (idx, &schema.fields()[idx])) + } else { + schema + .fields() + .iter() + .enumerate() + .find(|(_, f)| f.name().eq_ignore_ascii_case(name)) + } +} + +/// Build a copy of `field` renamed to `new_name`, preserving its data type, +/// nullability, and metadata. Used by [`remap_physical_schema`] when matching +/// physical fields to logical fields by id or by case-insensitive name. +fn rename_field(field: &Field, new_name: &str) -> FieldRef { + Arc::new( + Field::new(new_name, field.data_type().clone(), field.is_nullable()) + .with_metadata(field.metadata().clone()), + ) +} + /// Remap physical schema field names to match logical schema field names. /// Mirrors Spark's `clipParquetGroupFields`: prefer ID match for any logical /// field that carries a `PARQUET:field_id`; fall back to case-insensitive @@ -114,14 +137,13 @@ fn schema_has_field_ids(schema: &SchemaRef) -> bool { fn remap_physical_schema( logical_schema: &SchemaRef, physical_schema: &SchemaRef, - case_sensitive: bool, - use_field_id: bool, - ignore_missing_field_id: bool, + options: &SparkParquetOptions, ) -> Result<(SchemaRef, HashMap)> { - let should_match_by_id = use_field_id && schema_has_field_ids(logical_schema); + let case_sensitive = options.case_sensitive; + let should_match_by_id = options.use_field_id && schema_has_field_ids(logical_schema); if should_match_by_id - && !ignore_missing_field_id + && !options.ignore_missing_field_id && !schema_has_field_ids(physical_schema) { // Mirrors `ParquetReadSupport.inferSchema`'s eager check (Spark throws @@ -186,7 +208,7 @@ fn remap_physical_schema( if id_to_phys_names.contains_key(&id) { None } else { - Some(lf.name().clone()) + Some(lf.name().to_ascii_lowercase()) } }) }) @@ -208,14 +230,7 @@ fn remap_physical_schema( { if logical_field.name() != field.name() { name_map.insert(logical_field.name().clone(), field.name().clone()); - return Arc::new( - Field::new( - logical_field.name(), - field.data_type().clone(), - field.is_nullable(), - ) - .with_metadata(field.metadata().clone()), - ); + return rename_field(field, logical_field.name()); } return Arc::clone(field); } @@ -224,16 +239,11 @@ fn remap_physical_schema( // ID is missing from the file. Mirrors Spark's // `generateFakeColumnName` in `matchIdField`. if should_match_by_id - && unmatched_id_logical_names - .iter() - .any(|name| name.eq_ignore_ascii_case(field.name())) + && unmatched_id_logical_names.contains(&field.name().to_ascii_lowercase()) { fake_counter += 1; let fake_name = format!("__datafusion_unmatched_field_id_{fake_counter}"); - return Arc::new( - Field::new(fake_name, field.data_type().clone(), field.is_nullable()) - .with_metadata(field.metadata().clone()), - ); + return rename_field(field, &fake_name); } // Name match. Spark's `matchIdField` does not fall through to a @@ -248,14 +258,7 @@ fn remap_physical_schema( && logical_field.name() != field.name() { name_map.insert(logical_field.name().clone(), field.name().clone()); - return Arc::new( - Field::new( - logical_field.name(), - field.data_type().clone(), - field.is_nullable(), - ) - .with_metadata(field.metadata().clone()), - ); + return rename_field(field, logical_field.name()); } } @@ -411,17 +414,11 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { let needs_remap = !self.parquet_options.case_sensitive || (self.parquet_options.use_field_id && schema_has_field_ids(&logical_file_schema)); - let ( - adapted_physical_schema, - logical_to_physical_names, - original_physical_schema, - ) = if needs_remap { + let (adapted_physical_schema, logical_to_physical_names) = if needs_remap { let (remapped, logical_to_physical) = remap_physical_schema( &logical_file_schema, &physical_file_schema, - self.parquet_options.case_sensitive, - self.parquet_options.use_field_id, - self.parquet_options.ignore_missing_field_id, + &self.parquet_options, )?; ( remapped, @@ -430,17 +427,9 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { } else { Some(logical_to_physical) }, - // Keep the original physical schema for per-column - // duplicate detection. Only meaningful in - // case-insensitive mode (matches existing behavior). - if !self.parquet_options.case_sensitive { - Some(Arc::clone(&physical_file_schema)) - } else { - None - }, ) } else { - (Arc::clone(&physical_file_schema), None, None) + (Arc::clone(&physical_file_schema), None) }; let default_factory = DefaultPhysicalExprAdapterFactory; @@ -451,12 +440,12 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { Ok(Arc::new(SparkPhysicalExprAdapter { logical_file_schema, + original_physical_schema: physical_file_schema, physical_file_schema: adapted_physical_schema, parquet_options: self.parquet_options.clone(), default_values: self.default_values.clone(), default_adapter, logical_to_physical_names, - original_physical_schema, })) } } @@ -491,10 +480,9 @@ struct SparkPhysicalExprAdapter { /// names so that downstream code can find columns in the actual stream /// schema. logical_to_physical_names: Option>, - /// The original (un-remapped) physical schema, kept for per-column - /// duplicate detection in case-insensitive mode. Only set when - /// `!case_sensitive`. - original_physical_schema: Option, + /// The original (un-remapped) physical schema, used for case-insensitive + /// duplicate detection. + original_physical_schema: SchemaRef, } impl PhysicalExprAdapter for SparkPhysicalExprAdapter { @@ -503,26 +491,22 @@ impl PhysicalExprAdapter for SparkPhysicalExprAdapter { // references a field with multiple case-insensitive matches in the // physical schema. Only the columns actually referenced trigger the // error (not the whole schema). - if let Some(orig_physical) = &self.original_physical_schema { - let mut duplicate_err: Option = None; - let _ = Arc::::clone(&expr).transform(|e| { + if !self.parquet_options.case_sensitive { + // Returning `Err` from `transform` short-circuits the tree walk on + // the first match. + Arc::::clone(&expr).transform(|e| { if let Some(col) = e.downcast_ref::() && let Some((req, matched)) = - check_column_duplicate(col.name(), orig_physical) + check_column_duplicate(col.name(), &self.original_physical_schema) { - duplicate_err = Some( - ParquetSchemaError::DuplicateFieldCaseInsensitive { - required_field_name: req, - matched_fields: matched, - } - .into(), - ); + return Err(ParquetSchemaError::DuplicateFieldCaseInsensitive { + required_field_name: req, + matched_fields: matched, + } + .into()); } Ok(Transformed::no(e)) - }); - if let Some(err) = duplicate_err { - return Err(err); - } + })?; } // First let the default adapter handle column remapping, missing @@ -597,43 +581,15 @@ impl SparkPhysicalExprAdapter { // that schema — not the logical or physical file schemas. // DataFusion's `DefaultPhysicalExprAdapter::resolve_physical_column` // also resolves by name for the same reason. - let logical_field = if self.parquet_options.case_sensitive { - self.logical_file_schema - .fields() - .iter() - .find(|f| f.name() == col_name) - } else { - self.logical_file_schema - .fields() - .iter() - .find(|f| f.name().eq_ignore_ascii_case(col_name)) - }; - let physical_field = if self.parquet_options.case_sensitive { - self.physical_file_schema - .fields() - .iter() - .find(|f| f.name() == col_name) - } else { - self.physical_file_schema - .fields() - .iter() - .find(|f| f.name().eq_ignore_ascii_case(col_name)) - }; - - // Remap the column index to the physical file schema so - // downstream evaluation reads the correct column from the - // parquet batch. - let physical_index = if self.parquet_options.case_sensitive { - self.physical_file_schema.index_of(col_name).ok() - } else { - self.physical_file_schema - .fields() - .iter() - .position(|f| f.name().eq_ignore_ascii_case(col_name)) - }; - - if let (Some(logical_field), Some(physical_field), Some(phys_idx)) = - (logical_field, physical_field, physical_index) + let case_sensitive = self.parquet_options.case_sensitive; + let logical_field = + find_field(&self.logical_file_schema, col_name, case_sensitive) + .map(|(_, f)| f); + let physical_match = + find_field(&self.physical_file_schema, col_name, case_sensitive); + + if let (Some(logical_field), Some((phys_idx, physical_field))) = + (logical_field, physical_match) { let remapped: Arc = if column.index() != phys_idx { Arc::new(Column::new(col_name, phys_idx)) @@ -954,6 +910,23 @@ impl SparkPhysicalExprAdapter { return Ok(expr); } + // Pre-compute the case-folded physical-name set so that the `is_missing` + // check is O(1) per default rather than O(physical_fields). For + // case-sensitive mode, `field_with_name` already does a HashMap lookup + // internally, so no pre-build is needed. + let lowercased_physical_names: Option> = + if self.parquet_options.case_sensitive { + None + } else { + Some( + self.physical_file_schema + .fields() + .iter() + .map(|f| f.name().to_ascii_lowercase()) + .collect(), + ) + }; + // Build owned (column_name, default_value) pairs for columns missing // from the physical file. For each default: filter to only columns // absent from the physical schema, then type-cast the value to match @@ -966,14 +939,10 @@ impl SparkPhysicalExprAdapter { // Only include defaults for columns missing from the physical // file schema. - let is_missing = if self.parquet_options.case_sensitive { - self.physical_file_schema.field_with_name(col_name).is_err() + let is_missing = if let Some(names) = &lowercased_physical_names { + !names.contains(&col_name.to_ascii_lowercase()) } else { - !self - .physical_file_schema - .fields() - .iter() - .any(|f| f.name().eq_ignore_ascii_case(col_name)) + self.physical_file_schema.field_with_name(col_name).is_err() }; if !is_missing { @@ -1123,7 +1092,6 @@ mod tests { use super::*; use std::fs::{File, create_dir_all}; - use std::path::PathBuf; use arrow::array::{ Array, BinaryArray, Date32Array, Decimal128Array, Float32Array, Float64Array, @@ -1144,12 +1112,36 @@ mod tests { use crate::parquet::options::EvalMode; - fn temp_parquet_path() -> PathBuf { + fn temp_parquet_path() -> String { let mut path = std::env::temp_dir(); path.push("datafusion-spark-tests"); create_dir_all(&path).unwrap(); path.push(format!("schema-adapter-{}.parquet", rand::random::())); - path + path.into_os_string().into_string().unwrap() + } + + /// Build a `DataSourceExec` stream over an existing Parquet file using a + /// `SparkPhysicalExprAdapterFactory` configured from `options`. + fn execute_with_factory( + path: String, + required_schema: SchemaRef, + options: SparkParquetOptions, + ) -> Result { + let factory: Arc = + Arc::new(SparkPhysicalExprAdapterFactory::new(options, None)); + + let parquet_source = ParquetSource::new(required_schema); + let files = FileGroup::new(vec![PartitionedFile::from_path(path)?]); + let file_scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + Arc::new(parquet_source), + ) + .with_file_groups(vec![files]) + .with_expr_adapter(Some(factory)) + .build(); + + let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); + parquet_exec.execute(0, Arc::new(TaskContext::default())) } /// Create a Parquet file containing a single batch and read it back using @@ -1170,27 +1162,12 @@ mod tests { options: SparkParquetOptions, ) -> Result { let path = temp_parquet_path(); - let path_str = path.to_str().unwrap().to_string(); let file = File::create(&path).unwrap(); let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap(); writer.write(batch).unwrap(); writer.close().unwrap(); - let factory: Arc = - Arc::new(SparkPhysicalExprAdapterFactory::new(options, None)); - - let parquet_source = ParquetSource::new(required_schema); - let files = FileGroup::new(vec![PartitionedFile::from_path(path_str)?]); - let file_scan_config = FileScanConfigBuilder::new( - ObjectStoreUrl::local_filesystem(), - Arc::new(parquet_source), - ) - .with_file_groups(vec![files]) - .with_expr_adapter(Some(factory)) - .build(); - - let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); - let mut stream = parquet_exec.execute(0, Arc::new(TaskContext::default()))?; + let mut stream = execute_with_factory(path, required_schema, options)?; stream.next().await.unwrap() } @@ -1568,7 +1545,6 @@ mod tests { let file_schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int32, false)])); let path = temp_parquet_path(); - let path_str = path.to_str().unwrap().to_string(); let file = File::create(&path)?; let writer = ArrowWriter::try_new(file, Arc::clone(&file_schema), None).unwrap(); writer.close().unwrap(); @@ -1579,21 +1555,7 @@ mod tests { let mut options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); options.allow_type_promotion = false; - let factory: Arc = - Arc::new(SparkPhysicalExprAdapterFactory::new(options, None)); - - let parquet_source = ParquetSource::new(required_schema); - let files = FileGroup::new(vec![PartitionedFile::from_path(path_str)?]); - let file_scan_config = FileScanConfigBuilder::new( - ObjectStoreUrl::local_filesystem(), - Arc::new(parquet_source), - ) - .with_file_groups(vec![files]) - .with_expr_adapter(Some(factory)) - .build(); - - let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); - let mut stream = parquet_exec.execute(0, Arc::new(TaskContext::default()))?; + let mut stream = execute_with_factory(path, required_schema, options)?; while let Some(batch) = stream.next().await { let batch = batch?; assert_eq!(batch.num_rows(), 0); @@ -1611,7 +1573,6 @@ mod tests { let batch = RecordBatch::try_new(Arc::clone(&file_schema), vec![values])?; let path = temp_parquet_path(); - let path_str = path.to_str().unwrap().to_string(); let file = File::create(&path)?; let mut writer = ArrowWriter::try_new(file, Arc::clone(&file_schema), None).unwrap(); @@ -1624,21 +1585,7 @@ mod tests { let mut options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); options.allow_type_promotion = false; - let factory: Arc = - Arc::new(SparkPhysicalExprAdapterFactory::new(options, None)); - - let parquet_source = ParquetSource::new(required_schema); - let files = FileGroup::new(vec![PartitionedFile::from_path(path_str)?]); - let file_scan_config = FileScanConfigBuilder::new( - ObjectStoreUrl::local_filesystem(), - Arc::new(parquet_source), - ) - .with_file_groups(vec![files]) - .with_expr_adapter(Some(factory)) - .build(); - - let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); - let mut stream = parquet_exec.execute(0, Arc::new(TaskContext::default()))?; + let mut stream = execute_with_factory(path, required_schema, options)?; let first = stream.next().await.unwrap(); let err = first.expect_err("expected ParquetSchemaConvert error on non-empty file"); @@ -1688,7 +1635,6 @@ mod tests { RecordBatch::try_new(Arc::clone(&file_schema), vec![col_a, col_b1, col_b2])?; let path = temp_parquet_path(); - let path_str = path.to_str().unwrap().to_string(); let file = File::create(&path)?; let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap(); writer.write(&batch).unwrap(); @@ -1700,21 +1646,7 @@ mod tests { let mut options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); options.case_sensitive = false; - let factory: Arc = - Arc::new(SparkPhysicalExprAdapterFactory::new(options, None)); - - let parquet_source = ParquetSource::new(required_schema); - let files = FileGroup::new(vec![PartitionedFile::from_path(path_str)?]); - let file_scan_config = FileScanConfigBuilder::new( - ObjectStoreUrl::local_filesystem(), - Arc::new(parquet_source), - ) - .with_file_groups(vec![files]) - .with_expr_adapter(Some(factory)) - .build(); - - let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); - let mut stream = parquet_exec.execute(0, Arc::new(TaskContext::default()))?; + let mut stream = execute_with_factory(path, required_schema, options)?; let result = stream.next().await.unwrap(); assert!(result.is_err());