diff --git a/datafusion/common/src/nested_struct.rs b/datafusion/common/src/nested_struct.rs index cdd6215d08e2f..56b6ac02fe7e2 100644 --- a/datafusion/common/src/nested_struct.rs +++ b/datafusion/common/src/nested_struct.rs @@ -189,6 +189,15 @@ pub fn cast_column( (DataType::LargeListView(_), DataType::LargeListView(target_inner)) => { cast_list_view_column::(source_col, target_inner, cast_options) } + ( + DataType::FixedSizeList(_, _), + DataType::FixedSizeList(target_inner, target_size), + ) => cast_fixed_size_list_column( + source_col, + target_inner, + *target_size, + cast_options, + ), ( DataType::Dictionary(source_key_type, _), DataType::Dictionary(target_key_type, target_value_type), @@ -208,15 +217,8 @@ fn cast_list_column( target_inner_field: &FieldRef, cast_options: &CastOptions, ) -> Result { - let source_list = source_col - .as_any() - .downcast_ref::>() - .ok_or_else(|| { - crate::error::DataFusionError::Plan(format!( - "Expected list array but got {}", - source_col.data_type() - )) - })?; + let source_list = + downcast_list_array::>(source_col, "list array")?; let cast_values = cast_column( source_list.values(), @@ -238,15 +240,8 @@ fn cast_list_view_column( target_inner_field: &FieldRef, cast_options: &CastOptions, ) -> Result { - let source_list = source_col - .as_any() - .downcast_ref::>() - .ok_or_else(|| { - crate::error::DataFusionError::Plan(format!( - "Expected list view array but got {}", - source_col.data_type() - )) - })?; + let source_list = + downcast_list_array::>(source_col, "list view array")?; let cast_values = cast_column( source_list.values(), @@ -264,6 +259,53 @@ fn cast_list_view_column( Ok(Arc::new(result)) } +fn cast_fixed_size_list_column( + source_col: &ArrayRef, + target_inner_field: &FieldRef, + target_size: i32, + cast_options: &CastOptions, +) -> Result { + use arrow::array::FixedSizeListArray; + + let source_list = + downcast_list_array::(source_col, "fixed size list array")?; + + let source_size = source_list.value_length(); + if source_size != target_size { + return _plan_err!( + "Cannot cast FixedSizeList column with size {} to size {}", + source_size, + target_size + ); + } + + let cast_values = cast_column( + source_list.values(), + target_inner_field.data_type(), + cast_options, + )?; + + let result = FixedSizeListArray::new( + Arc::clone(target_inner_field), + target_size, + cast_values, + source_list.nulls().cloned(), + ); + Ok(Arc::new(result)) +} + +fn downcast_list_array<'a, A: Array + 'static>( + source_col: &'a ArrayRef, + expected: &str, +) -> Result<&'a A> { + source_col.as_any().downcast_ref::().ok_or_else(|| { + crate::error::DataFusionError::Plan(format!( + "Expected {expected} but got {}", + source_col.data_type() + )) + }) +} + fn cast_dictionary_column( source_col: &ArrayRef, source_key_type: &DataType, @@ -431,6 +473,21 @@ pub fn validate_data_type_compatibility( | (DataType::LargeListView(s), DataType::LargeListView(t)) => { validate_field_compatibility(s, t)?; } + ( + DataType::FixedSizeList(s, source_size), + DataType::FixedSizeList(t, target_size), + ) => { + // FixedSizeList sizes must match before nested field checks. + if source_size != target_size { + return _plan_err!( + "Cannot cast FixedSizeList field '{}' with size {} to size {}", + field_name, + source_size, + target_size + ); + } + validate_field_compatibility(s, t)?; + } (DataType::Dictionary(s_key, s_val), DataType::Dictionary(t_key, t_val)) => { if !can_cast_types(s_key, t_key) { return _plan_err!( @@ -460,7 +517,7 @@ pub fn validate_data_type_compatibility( /// name-based nested struct casting logic, rather than Arrow's standard cast. /// /// This is the case when both types are struct types, or both are the same -/// container type (List, LargeList, ListView, LargeListView, Dictionary) wrapping +/// container type (List, LargeList, ListView, LargeListView, FixedSizeList, Dictionary) wrapping /// types that recursively contain structs. /// /// Use this predicate at both planning time (to decide whether to apply struct @@ -475,7 +532,8 @@ pub fn requires_nested_struct_cast( (DataType::List(s), DataType::List(t)) | (DataType::LargeList(s), DataType::LargeList(t)) | (DataType::ListView(s), DataType::ListView(t)) - | (DataType::LargeListView(s), DataType::LargeListView(t)) => { + | (DataType::LargeListView(s), DataType::LargeListView(t)) + | (DataType::FixedSizeList(s, _), DataType::FixedSizeList(t, _)) => { requires_nested_struct_cast(s.data_type(), t.data_type()) } (DataType::Dictionary(_, s_val), DataType::Dictionary(_, t_val)) => { @@ -1336,4 +1394,98 @@ mod tests { &DataType::List(arc_field("item", DataType::Int64)), )); } + + #[test] + fn test_cast_fixed_size_list_struct_incompatible_type_fails() { + // Build a FixedSizeList and try to cast to + // FixedSizeList (incompatible types). + let struct_arr = StructArray::from(vec![( + arc_field("a", DataType::Utf8), + Arc::new(StringArray::from(vec!["x", "y"])) as ArrayRef, + )]); + + let source_field = + arc_field("item", struct_type(vec![field("a", DataType::Utf8)])); + use arrow::array::FixedSizeListArray; + let source_list = + FixedSizeListArray::new(source_field, 1, Arc::new(struct_arr), None); + let source_col: ArrayRef = Arc::new(source_list); + + let target_field = + arc_field("item", struct_type(vec![field("a", DataType::Int32)])); + let target_type = DataType::FixedSizeList(target_field, 1); + + let result = cast_column(&source_col, &target_type, &DEFAULT_CAST_OPTIONS); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Cannot cast")); + } + + #[test] + fn test_cast_fixed_size_list_struct_non_nullable_field_fails() { + // Build a FixedSizeList and try to cast to + // FixedSizeList (should fail). + let struct_arr = StructArray::from(vec![( + arc_field("a", DataType::Int32), + Arc::new(Int32Array::from(vec![1])) as ArrayRef, + )]); + + let source_field = + arc_field("item", struct_type(vec![field("a", DataType::Int32)])); + use arrow::array::FixedSizeListArray; + let source_list = + FixedSizeListArray::new(source_field, 1, Arc::new(struct_arr), None); + let source_col: ArrayRef = Arc::new(source_list); + + let target_field = arc_field( + "item", + struct_type(vec![ + field("a", DataType::Int32), + non_null_field("b", DataType::Int32), + ]), + ); + let target_type = DataType::FixedSizeList(target_field, 1); + + let result = cast_column(&source_col, &target_type, &DEFAULT_CAST_OPTIONS); + assert!(result.is_err()); + let error = result.unwrap_err().to_string(); + assert!( + error.contains("cannot fill with NULL") || error.contains("non-nullable") + ); + } + + #[test] + fn test_cast_fixed_size_list_size_mismatch_fails() { + use arrow::array::FixedSizeListArray; + + let values = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef; + let source_list = + FixedSizeListArray::new(arc_field("item", DataType::Int32), 1, values, None); + let source_col: ArrayRef = Arc::new(source_list); + + let target_type = DataType::FixedSizeList(arc_field("item", DataType::Int32), 2); + + let result = cast_column(&source_col, &target_type, &DEFAULT_CAST_OPTIONS); + assert!(result.is_err()); + assert_contains!( + result.unwrap_err().to_string(), + "Cannot cast FixedSizeList column with size 1 to size 2" + ); + } + + #[test] + fn test_requires_nested_struct_cast_fixed_size_list() { + let s1 = struct_type(vec![field("a", DataType::Int32)]); + let s2 = struct_type(vec![field("a", DataType::Int64)]); + + assert!(requires_nested_struct_cast( + &DataType::FixedSizeList(arc_field("item", s1.clone()), 2), + &DataType::FixedSizeList(arc_field("item", s2.clone()), 2), + )); + + // FixedSizeList with non-struct inner types should return false + assert!(!requires_nested_struct_cast( + &DataType::FixedSizeList(arc_field("item", DataType::Int32), 2), + &DataType::FixedSizeList(arc_field("item", DataType::Int64), 2), + )); + } } diff --git a/datafusion/core/tests/parquet/expr_adapter.rs b/datafusion/core/tests/parquet/expr_adapter.rs index cf32efbd702fd..82d73d2531be0 100644 --- a/datafusion/core/tests/parquet/expr_adapter.rs +++ b/datafusion/core/tests/parquet/expr_adapter.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use arrow::array::{ - Array, ArrayRef, BooleanArray, Int32Array, Int64Array, LargeListArray, ListArray, - RecordBatch, StringArray, StructArray, record_batch, + Array, ArrayRef, BooleanArray, FixedSizeListArray, Int32Array, Int64Array, + LargeListArray, ListArray, RecordBatch, StringArray, StructArray, record_batch, }; use arrow::buffer::OffsetBuffer; use arrow::compute::concat_batches; @@ -60,13 +60,19 @@ async fn write_parquet(batch: RecordBatch, store: Arc, path: &s enum NestedListKind { List, LargeList, + FixedSizeList, } +const FIXED_SIZE_LIST_LEN: usize = 2; + impl NestedListKind { fn field_data_type(self, item_field: Arc) -> DataType { match self { Self::List => DataType::List(item_field), Self::LargeList => DataType::LargeList(item_field), + Self::FixedSizeList => { + DataType::FixedSizeList(item_field, FIXED_SIZE_LIST_LEN as i32) + } } } @@ -89,6 +95,19 @@ impl NestedListKind { values, None, )), + Self::FixedSizeList => { + assert_eq!( + lengths.as_slice(), + &[FIXED_SIZE_LIST_LEN], + "FixedSizeList fixtures must contain exactly {FIXED_SIZE_LIST_LEN} elements per row" + ); + Arc::new(FixedSizeListArray::new( + item_field, + FIXED_SIZE_LIST_LEN as i32, + values, + None, + )) + } } } @@ -96,6 +115,7 @@ impl NestedListKind { match self { Self::List => "list", Self::LargeList => "large_list", + Self::FixedSizeList => "fixed_size_list", } } } @@ -277,7 +297,8 @@ fn nested_list_table_schema( } // Helper to extract message values from a nested list column. -// Returns the values at indices 0 and 1 from either a ListArray or LargeListArray. +// Returns the values at indices 0 and 1 from either a ListArray, LargeListArray, +// or FixedSizeListArray. fn extract_nested_list_values( kind: NestedListKind, column: &ArrayRef, @@ -297,7 +318,50 @@ fn extract_nested_list_values( .expect("messages should be a LargeListArray"); (list.value(0), list.value(1)) } + NestedListKind::FixedSizeList => { + let list = column + .as_any() + .downcast_ref::() + .expect("messages should be a FixedSizeListArray"); + (list.value(0), list.value(1)) + } + } +} + +fn evolved_messages(kind: NestedListKind) -> Vec> { + let mut messages = vec![NestedMessageRow { + id: 30, + name: "gamma", + chain: Some("eth"), + ignored: Some(99), + }]; + if matches!(kind, NestedListKind::FixedSizeList) { + messages.push(NestedMessageRow { + id: 40, + name: "delta", + chain: Some("doge"), + ignored: Some(100), + }); + } + messages +} + +fn error_messages(kind: NestedListKind) -> Vec> { + let mut messages = vec![NestedMessageRow { + id: 10, + name: "alpha", + chain: Some("eth"), + ignored: None, + }]; + if matches!(kind, NestedListKind::FixedSizeList) { + messages.push(NestedMessageRow { + id: 20, + name: "beta", + chain: Some("doge"), + ignored: None, + }); } + messages } // Helper to set up a nested list test fixture. @@ -352,15 +416,11 @@ async fn assert_nested_list_struct_schema_evolution(kind: NestedListKind) -> Res ); // new.parquet shape: messages item struct adds nullable `chain` and extra `ignored`. + let new_messages = evolved_messages(kind); let new_batch = nested_messages_batch( kind, 2, - &[NestedMessageRow { - id: 30, - name: "gamma", - chain: Some("eth"), - ignored: Some(99), - }], + &new_messages, &message_fields(DataType::Utf8, true, true, true), ); @@ -429,7 +489,12 @@ async fn assert_nested_list_struct_schema_evolution(kind: NestedListKind) -> Res .as_any() .downcast_ref::() .unwrap(); - assert_eq!(new_chain.iter().collect::>(), vec![Some("eth")]); + let expected_new_chain = if matches!(kind, NestedListKind::FixedSizeList) { + vec![Some("eth"), Some("doge")] + } else { + vec![Some("eth")] + }; + assert_eq!(new_chain.iter().collect::>(), expected_new_chain); let projected = ctx .sql( @@ -863,12 +928,12 @@ async fn test_struct_schema_evolution_projection_and_filter() -> Result<()> { Ok(()) } -/// Macro to generate paired test functions for List and LargeList variants. -/// Expands to two `#[tokio::test]` functions with the specified names. -macro_rules! test_struct_schema_evolution_pair { +/// Macro to generate schema evolution tests for list-like variants. +macro_rules! test_struct_schema_evolution_variants { ( list: $list_test:ident, large_list: $large_list_test:ident, + fixed_size_list: $fixed_size_list_test:ident, fn: $assertion_fn:path $(, args: $($arg:expr),+)? ) => { #[tokio::test] @@ -880,10 +945,16 @@ macro_rules! test_struct_schema_evolution_pair { async fn $large_list_test() { $assertion_fn(NestedListKind::LargeList $(, $($arg),+)?).await; } + + #[tokio::test] + async fn $fixed_size_list_test() { + $assertion_fn(NestedListKind::FixedSizeList $(, $($arg),+)?).await; + } }; ( list: $list_test:ident, large_list: $large_list_test:ident, + fixed_size_list: $fixed_size_list_test:ident, fn_result: $assertion_fn:path ) => { #[tokio::test] @@ -895,12 +966,18 @@ macro_rules! test_struct_schema_evolution_pair { async fn $large_list_test() -> Result<()> { $assertion_fn(NestedListKind::LargeList).await } + + #[tokio::test] + async fn $fixed_size_list_test() -> Result<()> { + $assertion_fn(NestedListKind::FixedSizeList).await + } }; } -test_struct_schema_evolution_pair!( +test_struct_schema_evolution_variants!( list: test_list_struct_schema_evolution_end_to_end, large_list: test_large_list_struct_schema_evolution_end_to_end, + fixed_size_list: test_fixed_size_list_struct_schema_evolution_end_to_end, fn_result: assert_nested_list_struct_schema_evolution ); @@ -910,15 +987,11 @@ async fn assert_nested_list_struct_schema_evolution_errors( chain_nullable: bool, expected_error: &str, ) { + let messages = error_messages(kind); let batch = nested_messages_batch( kind, 1, - &[NestedMessageRow { - id: 10, - name: "alpha", - chain: Some("eth"), - ignored: None, - }], + &messages, &message_fields(DataType::Utf8, true, true, false), ); @@ -970,15 +1043,17 @@ fn incompatible_chain_type() -> DataType { DataType::Struct(vec![Arc::new(Field::new("value", DataType::Utf8, true))].into()) } -test_struct_schema_evolution_pair!( +test_struct_schema_evolution_variants!( list: test_list_struct_schema_evolution_non_nullable_missing_field_fails, large_list: test_large_list_struct_schema_evolution_non_nullable_missing_field_fails, + fixed_size_list: test_fixed_size_list_struct_schema_evolution_non_nullable_missing_field_fails, fn: assert_non_nullable_missing_chain_field_fails ); -test_struct_schema_evolution_pair!( +test_struct_schema_evolution_variants!( list: test_list_struct_schema_evolution_incompatible_field_fails, large_list: test_large_list_struct_schema_evolution_incompatible_field_fails, + fixed_size_list: test_fixed_size_list_struct_schema_evolution_incompatible_field_fails, fn: assert_incompatible_chain_field_fails ); diff --git a/datafusion/expr-common/src/columnar_value.rs b/datafusion/expr-common/src/columnar_value.rs index bc6b8177ab3cf..a225f27854875 100644 --- a/datafusion/expr-common/src/columnar_value.rs +++ b/datafusion/expr-common/src/columnar_value.rs @@ -317,6 +317,8 @@ fn cast_array_by_name( array.data_type(), cast_type, ) { + // Planning uses the same predicate before building the physical cast, + // so this branch must remain the runtime mirror of that validation. datafusion_common::nested_struct::cast_column(array, cast_type, cast_options) } else { ensure_date_array_timestamp_bounds(array, cast_type)?; diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 24e486f8050fe..82a4a9de87823 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -321,11 +321,7 @@ pub fn cast_with_options( Ok(Arc::clone(&expr)) } else if requires_nested_struct_cast(&expr_type, &cast_type) { if can_cast_named_struct_types(&expr_type, &cast_type) { - // Allow casts involving structs (including nested inside Lists, Dictionaries, - // etc.) that pass name-based compatibility validation. This validation is - // applied at planning time (now) to fail fast, rather than deferring errors - // to execution time. The name-based casting logic will be executed at runtime - // via ColumnarValue::cast_to. + // Keep this planner check in sync with ColumnarValue::cast_to runtime routing. Ok(Arc::new(CastExpr::new(expr, cast_type, cast_options))) } else { not_impl_err!("Unsupported CAST from {expr_type} to {cast_type}") @@ -973,6 +969,30 @@ mod tests { Ok(()) } + #[test] + fn fixed_size_list_struct_size_mismatch_fails_at_planning() -> Result<()> { + let fixed_size_list = |inner_type, size| { + FixedSizeList(Arc::new(Field::new("item", inner_type, true)), size) + }; + + let source_type = fixed_size_list( + Struct(Fields::from(vec![Arc::new(Field::new("x", Int32, true))])), + 1, + ); + let schema = Schema::new(vec![Field::new("a", source_type, true)]); + + let invalid_target = fixed_size_list( + Struct(Fields::from(vec![Arc::new(Field::new("x", Int64, true))])), + 2, + ); + + let err = cast_with_options(col("a", &schema)?, &schema, invalid_target, None) + .expect_err("fixed-size-list size mismatch should fail during planning"); + assert!(err.to_string().contains("Unsupported CAST")); + + Ok(()) + } + #[test] #[ignore] // TODO: https://github.com/apache/datafusion/issues/5396 fn test_cast_decimal() -> Result<()> { diff --git a/datafusion/sqllogictest/test_files/schema_evolution_nested.slt b/datafusion/sqllogictest/test_files/schema_evolution_nested.slt index 53bc16fe51508..d19aa1948fa0c 100644 --- a/datafusion/sqllogictest/test_files/schema_evolution_nested.slt +++ b/datafusion/sqllogictest/test_files/schema_evolution_nested.slt @@ -122,3 +122,61 @@ FROM large_list_messages; ---- 1 10 NULL 2 30 eth + +statement ok +CREATE EXTERNAL TABLE fixed_size_list_messages ( + row_id INT, + messages STRUCT[2] +) +STORED AS PARQUET +LOCATION 'test_files/scratch/schema_evolution_nested/fixed_size_list_messages/'; + +statement ok +COPY ( + SELECT + 1 AS row_id, + arrow_cast( + [ + named_struct('id', 10, 'name', 'alpha'), + named_struct('id', 20, 'name', 'beta') + ], + 'FixedSizeList(2, Struct("id": Int64, "name": Utf8View))' + ) AS messages +) TO 'test_files/scratch/schema_evolution_nested/fixed_size_list_messages/old.parquet' +STORED AS PARQUET; + +statement ok +COPY ( + SELECT + 2 AS row_id, + arrow_cast( + [ + named_struct('id', 30, 'name', 'gamma', 'chain', 'eth'), + named_struct('id', 40, 'name', 'delta', 'chain', 'doge') + ], + 'FixedSizeList(2, Struct("id": Int64, "name": Utf8View, "chain": Utf8View))' + ) AS messages +) TO 'test_files/scratch/schema_evolution_nested/fixed_size_list_messages/new.parquet' +STORED AS PARQUET; + +query TTT +DESCRIBE fixed_size_list_messages; +---- +row_id Int32 YES +messages FixedSizeList(2 x Struct("id": Int64, "name": Utf8View, "chain": Utf8View)) YES + +query I? +SELECT row_id, messages FROM fixed_size_list_messages ORDER BY row_id; +---- +1 [{id: 10, name: alpha, chain: NULL}, {id: 20, name: beta, chain: NULL}] +2 [{id: 30, name: gamma, chain: eth}, {id: 40, name: delta, chain: doge}] + +query IIT rowsort +SELECT + row_id, + get_field(messages[1], 'id') AS msg_id, + get_field(messages[1], 'chain') AS chain +FROM fixed_size_list_messages; +---- +1 10 NULL +2 30 eth