From f3fdb5a4dc7e390e9adc73e8091d2c1929991253 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 17 May 2026 11:28:31 -0600 Subject: [PATCH 1/5] feat: optional timezone for coerce_int96 Add TableParquetOptions.coerce_int96_tz and a new coerce_int96_to_resolution_with_tz function that, when given a timezone, produces Timestamp(unit, Some(tz)) for INT96-derived columns instead of the default Timestamp(unit, None). The existing coerce_int96_to_resolution keeps its signature and forwards None, so this change is API-compatible (no parameter-count change on any existing public function). Threads the new option through ParquetSource, ParquetOpener, and DFParquetMetadata. Spark and other systems write INT96 as UTC-adjusted instants, so downstream readers that need the resulting Arrow type to carry the LTZ signal (for example, Comet's schema adapter rejecting INT96 -> TimestampNTZ on Spark 3.x) can set this to "UTC". Default behavior is unchanged when the option is unset. The new option round-trips through both `datafusion-proto-common` (`ParquetOptions.coerce_int96_tz_opt` oneof) and `datafusion-proto` (reuses the common message); covered by three new tests in `from_proto::tests`. --- datafusion/common/src/config.rs | 10 ++++++ .../common/src/file_options/parquet_writer.rs | 3 ++ .../datasource-parquet/src/file_format.rs | 25 +++++++++++-- datafusion/datasource-parquet/src/metadata.rs | 14 ++++++-- datafusion/datasource-parquet/src/opener.rs | 12 +++++-- datafusion/datasource-parquet/src/source.rs | 7 ++++ .../proto/datafusion_common.proto | 8 +++++ datafusion/proto-common/src/from_proto/mod.rs | 36 +++++++++++++++++++ .../proto-common/src/generated/pbjson.rs | 22 ++++++++++++ .../proto-common/src/generated/prost.rs | 15 ++++++++ datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 15 ++++++++ .../proto/src/logical_plan/file_formats.rs | 6 ++++ .../test_files/information_schema.slt | 2 ++ docs/source/user-guide/configs.md | 1 + 15 files changed, 171 insertions(+), 6 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 7d32f2a88fd9c..8d58c5ace4b27 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -908,6 +908,16 @@ config_namespace! { /// nanosecond resolution. pub coerce_int96: Option, transform = str::to_lowercase, default = None + /// (reading) Optional timezone applied to INT96 columns when `coerce_int96` + /// is set. When `Some`, INT96 columns coerce to + /// `Timestamp(, Some())` instead of the default + /// `Timestamp(, None)`. Spark and other systems write INT96 + /// values as UTC-adjusted instants, so callers that need the resulting + /// Arrow type to be timezone-aware (e.g. for Spark `TimestampType` + /// semantics) should set this to `"UTC"`. No effect when `coerce_int96` + /// is `None`. + pub coerce_int96_tz: Option, default = None + /// (reading) Use any available bloom filters when reading parquet files pub bloom_filter_on_read: bool, default = true diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index eaf5a1642e8e2..3f827fbfa75a0 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -208,6 +208,7 @@ impl ParquetOptions { schema_force_view_types: _, binary_as_string: _, // not used for writer props coerce_int96: _, // not used for writer props + coerce_int96_tz: _, // not used for writer props skip_arrow_metadata: _, max_predicate_cache_size: _, } = self; @@ -482,6 +483,7 @@ mod tests { binary_as_string: defaults.binary_as_string, skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, + coerce_int96_tz: None, max_predicate_cache_size: defaults.max_predicate_cache_size, use_content_defined_chunking: defaults.use_content_defined_chunking.clone(), } @@ -600,6 +602,7 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + coerce_int96_tz: None, use_content_defined_chunking: props.content_defined_chunking().map(|c| { CdcOptions { min_chunk_size: c.min_chunk_size, diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 5676ee940c7b8..74cf34ff3e495 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -713,10 +713,31 @@ pub fn apply_file_schema_type_coercions( } /// Coerces the file schema's Timestamps to the provided TimeUnit if Parquet schema contains INT96. +/// +/// Equivalent to calling [`coerce_int96_to_resolution_with_tz`] with `timezone: None`, +/// producing `Timestamp(time_unit, None)` for INT96-derived columns (the historical +/// default). Use [`coerce_int96_to_resolution_with_tz`] to attach a timezone. pub fn coerce_int96_to_resolution( parquet_schema: &SchemaDescriptor, file_schema: &Schema, time_unit: &TimeUnit, +) -> Option { + coerce_int96_to_resolution_with_tz(parquet_schema, file_schema, time_unit, None) +} + +/// Coerces the file schema's Timestamps to the provided TimeUnit if Parquet schema contains INT96. +/// +/// When `timezone` is `Some`, INT96-derived columns coerce to +/// `Timestamp(time_unit, Some(timezone))`; otherwise they coerce to +/// `Timestamp(time_unit, None)` (the historical default). Spark and other +/// systems write INT96 as UTC-adjusted instants, so callers that need the +/// resulting Arrow type to be timezone-aware should pass +/// `Some(&Arc::from("UTC"))`. +pub fn coerce_int96_to_resolution_with_tz( + parquet_schema: &SchemaDescriptor, + file_schema: &Schema, + time_unit: &TimeUnit, + timezone: Option<&Arc>, ) -> Option { // Traverse the parquet_schema columns looking for int96 physical types. If encountered, insert // the field's full path into a set. @@ -879,11 +900,11 @@ pub fn coerce_int96_to_resolution( (DataType::Timestamp(TimeUnit::Nanosecond, None), None) if int96_fields.contains(parquet_path.concat().as_str()) => // We found a timestamp(nanos) and it originated as int96. Coerce it to the correct - // time_unit. + // time_unit, optionally attaching the requested timezone. { parent_fields.borrow_mut().push(field_with_new_type( current_field, - DataType::Timestamp(*time_unit, None), + DataType::Timestamp(*time_unit, timezone.cloned()), )); } // Other types can be cloned as they are. diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 70ac3fe4987c0..276eb0e7115e2 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -18,7 +18,7 @@ //! [`DFParquetMetadata`] for fetching Parquet file metadata, statistics //! and schema information. -use crate::{apply_file_schema_type_coercions, coerce_int96_to_resolution}; +use crate::{apply_file_schema_type_coercions, coerce_int96_to_resolution_with_tz}; use arrow::array::{Array, ArrayRef, BooleanArray}; use arrow::compute::and; use arrow::compute::kernels::cmp::eq; @@ -72,6 +72,8 @@ pub struct DFParquetMetadata<'a> { file_metadata_cache: Option>, /// timeunit to coerce INT96 timestamps to pub coerce_int96: Option, + /// Optional timezone applied to INT96-coerced timestamps. + pub coerce_int96_tz: Option>, } impl<'a> DFParquetMetadata<'a> { @@ -83,6 +85,7 @@ impl<'a> DFParquetMetadata<'a> { decryption_properties: None, file_metadata_cache: None, coerce_int96: None, + coerce_int96_tz: None, } } @@ -116,6 +119,12 @@ impl<'a> DFParquetMetadata<'a> { self } + /// Set the optional timezone applied to INT96-coerced timestamps. + pub fn with_coerce_int96_tz(mut self, timezone: Option>) -> Self { + self.coerce_int96_tz = timezone; + self + } + /// Fetch parquet metadata from the remote object store pub async fn fetch_metadata(&self) -> Result> { // implementation to fetch parquet metadata @@ -218,10 +227,11 @@ impl<'a> DFParquetMetadata<'a> { .coerce_int96 .as_ref() .and_then(|time_unit| { - coerce_int96_to_resolution( + coerce_int96_to_resolution_with_tz( file_metadata.schema_descr(), &schema, time_unit, + self.coerce_int96_tz.as_ref(), ) }) .unwrap_or(schema); diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 11cf786a3d6b7..e7be4787168c7 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -24,7 +24,7 @@ use crate::row_filter::{RowFilterGenerator, build_projection_read_plan}; use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, - apply_file_schema_type_coercions, coerce_int96_to_resolution, + apply_file_schema_type_coercions, coerce_int96_to_resolution_with_tz, }; use arrow::array::RecordBatch; use arrow::datatypes::DataType; @@ -119,6 +119,10 @@ pub(super) struct ParquetMorselizer { pub enable_row_group_stats_pruning: bool, /// Coerce INT96 timestamps to specific TimeUnit pub coerce_int96: Option, + /// Optional timezone applied to INT96-coerced timestamps. When `Some`, the + /// coerced column type becomes `Timestamp(, Some())`. + /// No effect when `coerce_int96` is `None`. + pub coerce_int96_tz: Option>, /// Optional parquet FileDecryptionProperties #[cfg(feature = "parquet_encryption")] pub file_decryption_properties: Option>, @@ -281,6 +285,7 @@ struct PreparedParquetOpen { enable_row_group_stats_pruning: bool, limit: Option, coerce_int96: Option, + coerce_int96_tz: Option>, expr_adapter_factory: Arc, predicate_creation_errors: Count, max_predicate_cache_size: Option, @@ -651,6 +656,7 @@ impl ParquetMorselizer { enable_row_group_stats_pruning: self.enable_row_group_stats_pruning, limit: self.limit, coerce_int96: self.coerce_int96, + coerce_int96_tz: self.coerce_int96_tz.clone(), expr_adapter_factory: Arc::clone(&self.expr_adapter_factory), predicate_creation_errors, max_predicate_cache_size: self.max_predicate_cache_size, @@ -778,10 +784,11 @@ impl MetadataLoadedParquetOpen { } if let Some(ref coerce) = prepared.coerce_int96 - && let Some(merged) = coerce_int96_to_resolution( + && let Some(merged) = coerce_int96_to_resolution_with_tz( reader_metadata.parquet_schema(), &physical_file_schema, coerce, + prepared.coerce_int96_tz.as_ref(), ) { physical_file_schema = Arc::new(merged); @@ -1748,6 +1755,7 @@ mod test { enable_bloom_filter: self.enable_bloom_filter, enable_row_group_stats_pruning: self.enable_row_group_stats_pruning, coerce_int96: self.coerce_int96, + coerce_int96_tz: None, #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index cf1fcea005dde..c1829fcbccace 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -557,6 +557,12 @@ impl FileSource for ParquetSource { .coerce_int96 .as_ref() .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); + let coerce_int96_tz = self + .table_parquet_options + .global + .coerce_int96_tz + .as_ref() + .map(|tz| Arc::::from(tz.as_str())); Ok(Box::new(ParquetMorselizer { partition_index: partition, @@ -578,6 +584,7 @@ impl FileSource for ParquetSource { enable_bloom_filter: self.bloom_filter_on_read(), enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, coerce_int96, + coerce_int96_tz, #[cfg(feature = "parquet_encryption")] file_decryption_properties, expr_adapter_factory, diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 5f4ba2b9acebd..684d9a2612408 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -628,6 +628,14 @@ message ParquetOptions { } CdcOptions content_defined_chunking = 35; + + // Optional timezone applied to INT96-coerced timestamps when `coerce_int96` + // is set. When `Some`, INT96 columns coerce to + // `Timestamp(, Some())` instead of the default + // `Timestamp(, None)`. No effect when `coerce_int96` is unset. + oneof coerce_int96_tz_opt { + string coerce_int96_tz = 36; + } } message CdcOptions { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index ef2a8c18470c4..94a06bcc13bbd 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1123,6 +1123,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt { protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v), }).unwrap_or(None), + coerce_int96_tz: value.coerce_int96_tz_opt.clone().map(|opt| match opt { + protobuf::parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(v) => Some(v), + }).unwrap_or(None), skip_arrow_metadata: value.skip_arrow_metadata, max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), @@ -1350,6 +1353,39 @@ mod tests { assert_eq!(opts, recovered); } + #[test] + fn test_parquet_options_coerce_int96_tz_unset_round_trip() { + let opts = ParquetOptions::default(); + assert!(opts.coerce_int96_tz.is_none()); + let recovered = parquet_options_proto_round_trip(opts.clone()); + assert_eq!(recovered.coerce_int96_tz, None); + } + + #[test] + fn test_parquet_options_coerce_int96_tz_set_round_trip() { + let opts = ParquetOptions { + coerce_int96: Some("us".to_string()), + coerce_int96_tz: Some("UTC".to_string()), + ..ParquetOptions::default() + }; + let recovered = parquet_options_proto_round_trip(opts.clone()); + assert_eq!(recovered.coerce_int96, Some("us".to_string())); + assert_eq!(recovered.coerce_int96_tz, Some("UTC".to_string())); + } + + #[test] + fn test_table_parquet_options_coerce_int96_tz_round_trip() { + let mut opts = TableParquetOptions::default(); + opts.global.coerce_int96 = Some("us".to_string()); + opts.global.coerce_int96_tz = Some("America/Los_Angeles".to_string()); + let recovered = table_parquet_options_proto_round_trip(opts.clone()); + assert_eq!(recovered.global.coerce_int96, Some("us".to_string())); + assert_eq!( + recovered.global.coerce_int96_tz, + Some("America/Los_Angeles".to_string()) + ); + } + #[test] fn test_parquet_options_cdc_enabled_round_trip() { let opts = ParquetOptions { diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index f6b5bbeaf3961..0568982e97a44 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -6171,6 +6171,9 @@ impl serde::Serialize for ParquetOptions { if self.max_predicate_cache_size_opt.is_some() { len += 1; } + if self.coerce_int96_tz_opt.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.ParquetOptions", len)?; if self.enable_page_index { struct_ser.serialize_field("enablePageIndex", &self.enable_page_index)?; @@ -6339,6 +6342,13 @@ impl serde::Serialize for ParquetOptions { } } } + if let Some(v) = self.coerce_int96_tz_opt.as_ref() { + match v { + parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(v) => { + struct_ser.serialize_field("coerceInt96Tz", v)?; + } + } + } struct_ser.end() } } @@ -6412,6 +6422,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "coerceInt96", "max_predicate_cache_size", "maxPredicateCacheSize", + "coerce_int96_tz", + "coerceInt96Tz", ]; #[allow(clippy::enum_variant_names)] @@ -6449,6 +6461,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { BloomFilterNdv, CoerceInt96, MaxPredicateCacheSize, + CoerceInt96Tz, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6503,6 +6516,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), "coerceInt96" | "coerce_int96" => Ok(GeneratedField::CoerceInt96), "maxPredicateCacheSize" | "max_predicate_cache_size" => Ok(GeneratedField::MaxPredicateCacheSize), + "coerceInt96Tz" | "coerce_int96_tz" => Ok(GeneratedField::CoerceInt96Tz), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6555,6 +6569,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut bloom_filter_ndv_opt__ = None; let mut coerce_int96_opt__ = None; let mut max_predicate_cache_size_opt__ = None; + let mut coerce_int96_tz_opt__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::EnablePageIndex => { @@ -6769,6 +6784,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } max_predicate_cache_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(x.0)); } + GeneratedField::CoerceInt96Tz => { + if coerce_int96_tz_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("coerceInt96Tz")); + } + coerce_int96_tz_opt__ = map_.next_value::<::std::option::Option<_>>()?.map(parquet_options::CoerceInt96TzOpt::CoerceInt96Tz); + } } } Ok(ParquetOptions { @@ -6805,6 +6826,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { bloom_filter_ndv_opt: bloom_filter_ndv_opt__, coerce_int96_opt: coerce_int96_opt__, max_predicate_cache_size_opt: max_predicate_cache_size_opt__, + coerce_int96_tz_opt: coerce_int96_tz_opt__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index f09af1db349e4..632b16929faa6 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -900,6 +900,12 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, + /// Optional timezone applied to INT96-coerced timestamps when `coerce_int96` + /// is set. When `Some`, INT96 columns coerce to + /// `Timestamp(, Some())` instead of the default + /// `Timestamp(, None)`. No effect when `coerce_int96` is unset. + #[prost(oneof = "parquet_options::CoerceInt96TzOpt", tags = "36")] + pub coerce_int96_tz_opt: ::core::option::Option, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -958,6 +964,15 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } + /// Optional timezone applied to INT96-coerced timestamps when `coerce_int96` + /// is set. When `Some`, INT96 columns coerce to + /// `Timestamp(, Some())` instead of the default + /// `Timestamp(, None)`. No effect when `coerce_int96` is unset. + #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum CoerceInt96TzOpt { + #[prost(string, tag = "36")] + CoerceInt96Tz(::prost::alloc::string::String), + } } #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct CdcOptions { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index dca95746b4631..940679b836ff1 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -936,6 +936,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { binary_as_string: value.binary_as_string, skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), + coerce_int96_tz_opt: value.coerce_int96_tz.clone().map(protobuf::parquet_options::CoerceInt96TzOpt::CoerceInt96Tz), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), content_defined_chunking: value.use_content_defined_chunking.as_ref().map(|cdc| protobuf::CdcOptions { diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index f09af1db349e4..632b16929faa6 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -900,6 +900,12 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, + /// Optional timezone applied to INT96-coerced timestamps when `coerce_int96` + /// is set. When `Some`, INT96 columns coerce to + /// `Timestamp(, Some())` instead of the default + /// `Timestamp(, None)`. No effect when `coerce_int96` is unset. + #[prost(oneof = "parquet_options::CoerceInt96TzOpt", tags = "36")] + pub coerce_int96_tz_opt: ::core::option::Option, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -958,6 +964,15 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } + /// Optional timezone applied to INT96-coerced timestamps when `coerce_int96` + /// is set. When `Some`, INT96 columns coerce to + /// `Timestamp(, Some())` instead of the default + /// `Timestamp(, None)`. No effect when `coerce_int96` is unset. + #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum CoerceInt96TzOpt { + #[prost(string, tag = "36")] + CoerceInt96Tz(::prost::alloc::string::String), + } } #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct CdcOptions { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index a050f5fba2061..683b6a612a53f 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -447,6 +447,9 @@ mod parquet { coerce_int96_opt: global_options.global.coerce_int96.map(|compression| { parquet_options::CoerceInt96Opt::CoerceInt96(compression) }), + coerce_int96_tz_opt: global_options.global.coerce_int96_tz.map(|tz| { + parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(tz) + }), max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) }), @@ -553,6 +556,9 @@ mod parquet { coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt { parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(), }), + coerce_int96_tz: proto.coerce_int96_tz_opt.as_ref().map(|opt| match opt { + parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(tz) => tz.clone(), + }), max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 8396a60137ee1..2c7db0ea10cc4 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -236,6 +236,7 @@ datafusion.execution.parquet.bloom_filter_ndv NULL datafusion.execution.parquet.bloom_filter_on_read true datafusion.execution.parquet.bloom_filter_on_write false datafusion.execution.parquet.coerce_int96 NULL +datafusion.execution.parquet.coerce_int96_tz NULL datafusion.execution.parquet.column_index_truncate_length 64 datafusion.execution.parquet.compression zstd(3) datafusion.execution.parquet.created_by datafusion @@ -383,6 +384,7 @@ datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter n datafusion.execution.parquet.bloom_filter_on_read true (reading) Use any available bloom filters when reading parquet files datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. +datafusion.execution.parquet.coerce_int96_tz NULL (reading) Optional timezone applied to INT96 columns when `coerce_int96` is set. When `Some`, INT96 columns coerce to `Timestamp(, Some())` instead of the default `Timestamp(, None)`. Spark and other systems write INT96 values as UTC-adjusted instants, so callers that need the resulting Arrow type to be timezone-aware (e.g. for Spark `TimestampType` semantics) should set this to `"UTC"`. No effect when `coerce_int96` is `None`. datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. datafusion.execution.parquet.created_by datafusion (writing) Sets "created by" property diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index f7f9426e2bd32..ccf6f8d52ce77 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -90,6 +90,7 @@ The following configuration settings are available: | datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | +| datafusion.execution.parquet.coerce_int96_tz | NULL | (reading) Optional timezone applied to INT96 columns when `coerce_int96` is set. When `Some`, INT96 columns coerce to `Timestamp(, Some())` instead of the default `Timestamp(, None)`. Spark and other systems write INT96 values as UTC-adjusted instants, so callers that need the resulting Arrow type to be timezone-aware (e.g. for Spark `TimestampType` semantics) should set this to `"UTC"`. No effect when `coerce_int96` is `None`. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | | datafusion.execution.parquet.max_predicate_cache_size | NULL | (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | From f9914bb324e33f2b73c721d47cbbd9b5b744296f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 18 May 2026 08:45:54 -0600 Subject: [PATCH 2/5] chore: address review feedback on coerce_int96_tz - Add unit test exercising coerce_int96_to_resolution_with_tz directly - Validate coerce_int96_tz via arrow::array::timezone::Tz at the source boundary so invalid values error at config-resolution instead of in compute kernels - Warn when coerce_int96_tz is set without coerce_int96, since the timezone is otherwise a silent no-op --- .../datasource-parquet/src/file_format.rs | 60 +++++++++++++++++++ datafusion/datasource-parquet/src/source.rs | 17 +++++- 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 74cf34ff3e495..747b7739c0138 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1903,6 +1903,66 @@ mod tests { assert_eq!(result, expected_schema); } + #[test] + fn coerce_int96_to_resolution_with_tz_applies_timezone() { + // Same input schema as `coerce_int96_to_resolution_with_mixed_timestamps`, but with a + // non-empty `timezone` argument. Only c0 (the INT96 column) should pick up the timezone; + // the other timestamp columns must keep whatever timezone they were declared with. + let spark_schema = " + message spark_schema { + optional int96 c0; + optional int64 c1 (TIMESTAMP(NANOS,true)); + optional int64 c2 (TIMESTAMP(NANOS,false)); + optional int64 c3 (TIMESTAMP(MILLIS,true)); + optional int64 c4 (TIMESTAMP(MILLIS,false)); + optional int64 c5 (TIMESTAMP(MICROS,true)); + optional int64 c6 (TIMESTAMP(MICROS,false)); + } + "; + + let schema = parse_message_type(spark_schema).expect("should parse schema"); + let descr = SchemaDescriptor::new(Arc::new(schema)); + + let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap(); + + let tz: Arc = Arc::from("UTC"); + let result = coerce_int96_to_resolution_with_tz( + &descr, + &arrow_schema, + &TimeUnit::Microsecond, + Some(&tz), + ) + .unwrap(); + + let expected_schema = Schema::new(vec![ + Field::new( + "c0", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ), + Field::new( + "c1", + DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), + true, + ), + Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true), + Field::new( + "c3", + DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())), + true, + ), + Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true), + Field::new( + "c5", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ), + Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true), + ]); + + assert_eq!(result, expected_schema); + } + #[test] fn coerce_int96_to_resolution_with_nested_types() { // This schema is derived from Comet's CometFuzzTestSuite ParquetGenerator only using int96 diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index c1829fcbccace..ef9c43176221c 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -32,6 +32,7 @@ use datafusion_datasource::as_file_source; use datafusion_datasource::file_stream::FileOpener; use datafusion_datasource::morsel::Morselizer; +use arrow::array::timezone::Tz; use arrow::datatypes::TimeUnit; use datafusion_common::DataFusionError; use datafusion_common::config::TableParquetOptions; @@ -52,6 +53,7 @@ use datafusion_physical_plan::filter_pushdown::{ }; use datafusion_physical_plan::metrics::Count; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use log::warn; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; @@ -562,7 +564,20 @@ impl FileSource for ParquetSource { .global .coerce_int96_tz .as_ref() - .map(|tz| Arc::::from(tz.as_str())); + .map(|tz| { + tz.parse::().map_err(|e| { + DataFusionError::Configuration(format!( + "Invalid parquet coerce_int96_tz {tz:?}: {e}" + )) + })?; + Ok::<_, DataFusionError>(Arc::::from(tz.as_str())) + }) + .transpose()?; + if coerce_int96_tz.is_some() && coerce_int96.is_none() { + warn!( + "coerce_int96_tz is set but coerce_int96 is not; the timezone will be ignored" + ); + } Ok(Box::new(ParquetMorselizer { partition_index: partition, From 3a8e248942af50957dfd64d6eab44fd3a1c75f87 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 18 May 2026 15:01:38 -0600 Subject: [PATCH 3/5] Update datafusion/common/src/config.rs Co-authored-by: Oleks V --- datafusion/common/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 8d58c5ace4b27..d73e389aba1f3 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -916,7 +916,7 @@ config_namespace! { /// Arrow type to be timezone-aware (e.g. for Spark `TimestampType` /// semantics) should set this to `"UTC"`. No effect when `coerce_int96` /// is `None`. - pub coerce_int96_tz: Option, default = None + pub coerce_int96_tz: Option, transform = str::to_lowercase, default = None /// (reading) Use any available bloom filters when reading parquet files pub bloom_filter_on_read: bool, default = true From 916029dac31a52c3240270bbc8427f0b922778c7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 18 May 2026 15:11:09 -0600 Subject: [PATCH 4/5] refactor: introduce Int96Coercer builder; revert tz lowercasing - Replace coerce_int96_to_resolution_with_tz with an Int96Coercer builder per @alamb's suggestion. The existing coerce_int96_to_resolution stays as a deprecated thin wrapper for backwards compatibility. - Revert the str::to_lowercase transform on coerce_int96_tz: IANA timezones are case-sensitive (verified that Tz::from_str rejects "utc" and "america/los_angeles"), so lowercasing would break valid values. - Add a comment on the coerce_int96_tz: None default in the opener.rs test builder. --- datafusion/common/src/config.rs | 2 +- .../datasource-parquet/src/file_format.rs | 124 ++++++++++++++---- datafusion/datasource-parquet/src/metadata.rs | 11 +- datafusion/datasource-parquet/src/opener.rs | 13 +- 4 files changed, 111 insertions(+), 39 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index d73e389aba1f3..8d58c5ace4b27 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -916,7 +916,7 @@ config_namespace! { /// Arrow type to be timezone-aware (e.g. for Spark `TimestampType` /// semantics) should set this to `"UTC"`. No effect when `coerce_int96` /// is `None`. - pub coerce_int96_tz: Option, transform = str::to_lowercase, default = None + pub coerce_int96_tz: Option, default = None /// (reading) Use any available bloom filters when reading parquet files pub bloom_filter_on_read: bool, default = true diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 747b7739c0138..b4e66ea206eb5 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -712,28 +712,102 @@ pub fn apply_file_schema_type_coercions( )) } -/// Coerces the file schema's Timestamps to the provided TimeUnit if Parquet schema contains INT96. +/// Coerces the file schema's Timestamps to the provided TimeUnit if the +/// Parquet schema contains INT96. /// -/// Equivalent to calling [`coerce_int96_to_resolution_with_tz`] with `timezone: None`, -/// producing `Timestamp(time_unit, None)` for INT96-derived columns (the historical -/// default). Use [`coerce_int96_to_resolution_with_tz`] to attach a timezone. +/// Deprecated wrapper around [`Int96Coercer`]; use the builder directly +/// instead — it also supports attaching a timezone via +/// [`Int96Coercer::with_timezone`]. +#[deprecated(since = "53.2.0", note = "use `Int96Coercer` instead")] pub fn coerce_int96_to_resolution( parquet_schema: &SchemaDescriptor, file_schema: &Schema, time_unit: &TimeUnit, ) -> Option { - coerce_int96_to_resolution_with_tz(parquet_schema, file_schema, time_unit, None) + Int96Coercer::new(parquet_schema, file_schema, time_unit).coerce() } -/// Coerces the file schema's Timestamps to the provided TimeUnit if Parquet schema contains INT96. +/// Builder for coercing INT96-originated Timestamp columns in `file_schema` +/// to a specific [`TimeUnit`], optionally attaching a timezone. /// -/// When `timezone` is `Some`, INT96-derived columns coerce to -/// `Timestamp(time_unit, Some(timezone))`; otherwise they coerce to -/// `Timestamp(time_unit, None)` (the historical default). Spark and other -/// systems write INT96 as UTC-adjusted instants, so callers that need the -/// resulting Arrow type to be timezone-aware should pass -/// `Some(&Arc::from("UTC"))`. -pub fn coerce_int96_to_resolution_with_tz( +/// INT96 is the legacy Parquet representation that systems like Spark use for +/// timestamps. Arrow surfaces it as `Timestamp(Nanosecond, None)`, but the +/// underlying values are written as UTC-adjusted instants. Use this builder +/// to: +/// +/// - Coerce INT96-derived columns to a smaller [`TimeUnit`] (e.g. microseconds) +/// to extend the representable date range. +/// - Optionally attach a timezone so the resulting Arrow type carries the +/// timezone-aware semantic (`Timestamp(unit, Some(tz))`). Without a +/// timezone, INT96-derived columns become `Timestamp(unit, None)` — the +/// historical default. +/// +/// Returns `None` if `file_schema` contains no INT96-derived columns. +/// +/// # Example +/// +/// ```ignore +/// use std::sync::Arc; +/// use arrow::datatypes::TimeUnit; +/// use datafusion_datasource_parquet::Int96Coercer; +/// +/// let coerced = Int96Coercer::new(parquet_schema, file_schema, &TimeUnit::Microsecond) +/// .with_timezone(Some(Arc::from("UTC"))) +/// .coerce(); +/// ``` +pub struct Int96Coercer<'a> { + parquet_schema: &'a SchemaDescriptor, + file_schema: &'a Schema, + time_unit: &'a TimeUnit, + timezone: Option>, +} + +impl<'a> Int96Coercer<'a> { + /// Create a new builder. INT96-derived columns will coerce to + /// `Timestamp(time_unit, None)` unless [`Self::with_timezone`] is set. + pub fn new( + parquet_schema: &'a SchemaDescriptor, + file_schema: &'a Schema, + time_unit: &'a TimeUnit, + ) -> Self { + Self { + parquet_schema, + file_schema, + time_unit, + timezone: None, + } + } + + /// Attach a timezone to INT96-derived columns. When `Some`, INT96-derived + /// columns coerce to `Timestamp(time_unit, Some(timezone))` instead of + /// the default `Timestamp(time_unit, None)`. Spark and other systems + /// write INT96 as UTC-adjusted instants, so callers that need the + /// resulting Arrow type to be timezone-aware should pass + /// `Some(Arc::from("UTC"))`. + pub fn with_timezone(mut self, timezone: Option>) -> Self { + self.timezone = timezone; + self + } + + /// Run the coercion, returning the rewritten schema or `None` if + /// `file_schema` contains no INT96-derived columns. + pub fn coerce(self) -> Option { + let Self { + parquet_schema, + file_schema, + time_unit, + timezone, + } = self; + coerce_int96_to_resolution_impl( + parquet_schema, + file_schema, + time_unit, + timezone.as_ref(), + ) + } +} + +fn coerce_int96_to_resolution_impl( parquet_schema: &SchemaDescriptor, file_schema: &Schema, time_unit: &TimeUnit, @@ -1872,9 +1946,9 @@ mod tests { let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap(); - let result = - coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond) - .unwrap(); + let result = Int96Coercer::new(&descr, &arrow_schema, &TimeUnit::Microsecond) + .coerce() + .unwrap(); // Only the first field (c0) should be converted to a microsecond timestamp because it's the // only timestamp that originated from an INT96. @@ -1925,14 +1999,10 @@ mod tests { let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap(); - let tz: Arc = Arc::from("UTC"); - let result = coerce_int96_to_resolution_with_tz( - &descr, - &arrow_schema, - &TimeUnit::Microsecond, - Some(&tz), - ) - .unwrap(); + let result = Int96Coercer::new(&descr, &arrow_schema, &TimeUnit::Microsecond) + .with_timezone(Some(Arc::from("UTC"))) + .coerce() + .unwrap(); let expected_schema = Schema::new(vec![ Field::new( @@ -2019,9 +2089,9 @@ mod tests { let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap(); - let result = - coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond) - .unwrap(); + let result = Int96Coercer::new(&descr, &arrow_schema, &TimeUnit::Microsecond) + .coerce() + .unwrap(); let expected_schema = Schema::new(vec![ Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true), diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 276eb0e7115e2..c32e45935636f 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -18,7 +18,7 @@ //! [`DFParquetMetadata`] for fetching Parquet file metadata, statistics //! and schema information. -use crate::{apply_file_schema_type_coercions, coerce_int96_to_resolution_with_tz}; +use crate::{Int96Coercer, apply_file_schema_type_coercions}; use arrow::array::{Array, ArrayRef, BooleanArray}; use arrow::compute::and; use arrow::compute::kernels::cmp::eq; @@ -227,12 +227,9 @@ impl<'a> DFParquetMetadata<'a> { .coerce_int96 .as_ref() .and_then(|time_unit| { - coerce_int96_to_resolution_with_tz( - file_metadata.schema_descr(), - &schema, - time_unit, - self.coerce_int96_tz.as_ref(), - ) + Int96Coercer::new(file_metadata.schema_descr(), &schema, time_unit) + .with_timezone(self.coerce_int96_tz.clone()) + .coerce() }) .unwrap_or(schema); Ok(schema) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index e7be4787168c7..a8c3ba5b14b2f 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -23,8 +23,8 @@ use crate::push_decoder::{DecoderBuilderConfig, PushDecoderStreamState}; use crate::row_filter::{RowFilterGenerator, build_projection_read_plan}; use crate::row_group_filter::{BloomFilterStatistics, RowGroupAccessPlanFilter}; use crate::{ - ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, - apply_file_schema_type_coercions, coerce_int96_to_resolution_with_tz, + Int96Coercer, ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, + apply_file_schema_type_coercions, }; use arrow::array::RecordBatch; use arrow::datatypes::DataType; @@ -784,12 +784,13 @@ impl MetadataLoadedParquetOpen { } if let Some(ref coerce) = prepared.coerce_int96 - && let Some(merged) = coerce_int96_to_resolution_with_tz( + && let Some(merged) = Int96Coercer::new( reader_metadata.parquet_schema(), &physical_file_schema, coerce, - prepared.coerce_int96_tz.as_ref(), ) + .with_timezone(prepared.coerce_int96_tz.clone()) + .coerce() { physical_file_schema = Arc::new(merged); options = options.with_schema(Arc::clone(&physical_file_schema)); @@ -1755,6 +1756,10 @@ mod test { enable_bloom_filter: self.enable_bloom_filter, enable_row_group_stats_pruning: self.enable_row_group_stats_pruning, coerce_int96: self.coerce_int96, + // No tests currently exercise coerce_int96_tz; the existing + // coerce_int96 tests all expect the legacy `Timestamp(_, None)` + // output. If a future test needs to set a timezone, add a + // builder setter analogous to with_coerce_int96. coerce_int96_tz: None, #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, From 49571f2409007bfb9dd0e449760d254329844225 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 18 May 2026 15:26:02 -0600 Subject: [PATCH 5/5] test: add end-to-end coerce_int96_tz coverage; fix infer_schema Adding the SLT test exposed that infer_schema was not threading coerce_int96_tz through, so CREATE EXTERNAL TABLE produced a declared schema of Timestamp(unit, None) while scans surfaced Timestamp(unit, Some(tz)). Thread the option through infer_schema and factor the Tz validation into a parse_coerce_int96_tz_string helper shared with create_morselizer. Also tighten the comment on the opener test builder's coerce_int96_tz default to point at the new SLT coverage. --- .../datasource-parquet/src/file_format.rs | 12 +++++- datafusion/datasource-parquet/src/opener.rs | 7 ++-- datafusion/datasource-parquet/src/source.rs | 22 +++++++---- .../sqllogictest/test_files/parquet.slt | 39 +++++++++++++++++++ 4 files changed, 67 insertions(+), 13 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index b4e66ea206eb5..93ac0c19e8701 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -63,7 +63,9 @@ use datafusion_session::Session; use crate::metadata::{DFParquetMetadata, lex_ordering_to_sorting_columns}; use crate::reader::CachedParquetFileReaderFactory; -use crate::source::{ParquetSource, parse_coerce_int96_string}; +use crate::source::{ + ParquetSource, parse_coerce_int96_string, parse_coerce_int96_tz_string, +}; use async_trait::async_trait; use bytes::Bytes; use datafusion_datasource::source::DataSourceExec; @@ -365,6 +367,13 @@ impl FileFormat for ParquetFormat { Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?), None => None, }; + let coerce_int96_tz = self + .options + .global + .coerce_int96_tz + .as_ref() + .map(|tz| parse_coerce_int96_tz_string(tz)) + .transpose()?; let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache(); @@ -382,6 +391,7 @@ impl FileFormat for ParquetFormat { .with_decryption_properties(file_decryption_properties) .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) .with_coerce_int96(coerce_int96) + .with_coerce_int96_tz(coerce_int96_tz.clone()) .fetch_schema_with_location() .await?; Ok::<_, DataFusionError>(result) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index a8c3ba5b14b2f..bcf89b5ea2c49 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1756,10 +1756,9 @@ mod test { enable_bloom_filter: self.enable_bloom_filter, enable_row_group_stats_pruning: self.enable_row_group_stats_pruning, coerce_int96: self.coerce_int96, - // No tests currently exercise coerce_int96_tz; the existing - // coerce_int96 tests all expect the legacy `Timestamp(_, None)` - // output. If a future test needs to set a timezone, add a - // builder setter analogous to with_coerce_int96. + // End-to-end coercion behavior (including timezone) is + // covered by parquet.slt. No opener-level test currently + // needs a non-default value here. coerce_int96_tz: None, #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index ef9c43176221c..2b367cf7600d5 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -508,6 +508,19 @@ pub(crate) fn parse_coerce_int96_string( } } +/// Validates that `tz` is a parseable IANA timezone and returns it as an +/// `Arc` for use in `Timestamp(_, Some(tz))` types. +pub(crate) fn parse_coerce_int96_tz_string( + tz: &str, +) -> datafusion_common::Result> { + tz.parse::().map_err(|e| { + DataFusionError::Configuration(format!( + "Invalid parquet coerce_int96_tz {tz:?}: {e}" + )) + })?; + Ok(Arc::::from(tz)) +} + /// Allows easy conversion from ParquetSource to Arc<dyn FileSource> impl From for Arc { fn from(source: ParquetSource) -> Self { @@ -564,14 +577,7 @@ impl FileSource for ParquetSource { .global .coerce_int96_tz .as_ref() - .map(|tz| { - tz.parse::().map_err(|e| { - DataFusionError::Configuration(format!( - "Invalid parquet coerce_int96_tz {tz:?}: {e}" - )) - })?; - Ok::<_, DataFusionError>(Arc::::from(tz.as_str())) - }) + .map(|tz| parse_coerce_int96_tz_string(tz)) .transpose()?; if coerce_int96_tz.is_some() && coerce_int96.is_none() { warn!( diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index 8026e98d86424..7b7a4fb196503 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -748,9 +748,48 @@ ERROR: Cast error: Failed to convert -9357363680509551 to datetime for Timestamp statement ok drop table int96_from_spark; +# Enable coercion of int96 to microseconds AND attach a UTC timezone, so the +# resulting Arrow type is `Timestamp(unit, Some("UTC"))`. This matches Spark +# `TimestampType` semantics (INT96 values are UTC-adjusted instants). +statement ok +set datafusion.execution.parquet.coerce_int96 = ms; + +statement ok +set datafusion.execution.parquet.coerce_int96_tz = 'UTC'; + +statement ok +CREATE EXTERNAL TABLE int96_from_spark +STORED AS PARQUET +LOCATION '../../parquet-testing/data/int96_from_spark.parquet'; + +# The schema now carries the timezone. +query TTT +describe int96_from_spark; +---- +a Timestamp(ms, "UTC") YES + +# Same values as the non-tz `coerce_int96 = ms` block above, but rendered in +# UTC (note the trailing `Z`). +query P +select * from int96_from_spark +---- +2024-01-01T20:34:56.123Z +2024-01-01T01:00:00Z +9999-12-31T03:00:00Z +2024-12-30T23:00:00Z +NULL +ERROR: Cast error: Failed to convert -9357363680509551 to datetime for Timestamp(ms, "UTC") + +# Cleanup / reset default setting +statement ok +drop table int96_from_spark; + statement ok set datafusion.execution.parquet.coerce_int96 = ns; +statement ok +reset datafusion.execution.parquet.coerce_int96_tz; + ### Tests for metadata caching