From e3c7f275278301fa50c118c7e65e1f2f44f56670 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Mon, 30 Mar 2026 16:01:59 +0200 Subject: [PATCH 01/13] Add bool8 --- .../src/types/canonical_extensions/bool8.rs | 96 +++++++++++++++++++ .../src/types/canonical_extensions/mod.rs | 1 + 2 files changed, 97 insertions(+) create mode 100644 datafusion/common/src/types/canonical_extensions/bool8.rs diff --git a/datafusion/common/src/types/canonical_extensions/bool8.rs b/datafusion/common/src/types/canonical_extensions/bool8.rs new file mode 100644 index 0000000000000..d5aa973b08085 --- /dev/null +++ b/datafusion/common/src/types/canonical_extensions/bool8.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::_internal_err; +use crate::types::extension::DFExtensionType; +use arrow::array::{Array, Int8Array}; +use arrow::datatypes::DataType; +use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; +use std::fmt::Write; + +/// Defines the extension type logic for the canonical `arrow.uuid` extension type. +/// +/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. +impl DFExtensionType for arrow_schema::extension::Bool8 { + fn storage_type(&self) -> DataType { + DataType::Int8 + } + + fn serialize_metadata(&self) -> Option { + None + } + + fn create_array_formatter<'fmt>( + &self, + array: &'fmt dyn Array, + options: &FormatOptions<'fmt>, + ) -> crate::Result>> { + if array.data_type() != &DataType::Int8 { + return _internal_err!("Wrong array type for Bool8"); + } + + let display_index = Bool8ValueDisplayIndex { + array: array.as_any().downcast_ref().unwrap(), + null_str: options.null(), + }; + Ok(Some(ArrayFormatter::new( + Box::new(display_index), + options.safe(), + ))) + } +} + +/// Pretty printer for binary UUID values. +#[derive(Debug, Clone, Copy)] +struct Bool8ValueDisplayIndex<'a> { + array: &'a Int8Array, + null_str: &'a str, +} + +impl DisplayIndex for Bool8ValueDisplayIndex<'_> { + fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult { + if self.array.is_null(idx) { + write!(f, "{}", self.null_str)?; + return Ok(()); + } + + let bytes = self.array.value(idx); + write!(f, "{}", bytes != 0)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + pub fn test_pretty_bool8() { + let values = Int8Array::from_iter([Some(0), Some(1), Some(-20), None]); + + let extension_type = arrow_schema::extension::Bool8 {}; + let formatter = extension_type + .create_array_formatter(&values, &FormatOptions::default().with_null("NULL")) + .unwrap() + .unwrap(); + + assert_eq!(formatter.value(0).to_string(), "false"); + assert_eq!(formatter.value(1).to_string(), "true"); + assert_eq!(formatter.value(2).to_string(), "true"); + assert_eq!(formatter.value(3).to_string(), "NULL"); + } +} diff --git a/datafusion/common/src/types/canonical_extensions/mod.rs b/datafusion/common/src/types/canonical_extensions/mod.rs index e61c415b44811..d9be595074541 100644 --- a/datafusion/common/src/types/canonical_extensions/mod.rs +++ b/datafusion/common/src/types/canonical_extensions/mod.rs @@ -15,4 +15,5 @@ // specific language governing permissions and limitations // under the License. +mod bool8; mod uuid; From a3604ff27d4e7ca357acd28f35f4d40b4a7a7ef7 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Wed, 1 Apr 2026 09:54:38 +0200 Subject: [PATCH 02/13] Add draft for more extension types --- .../src/types/canonical_extensions/bool8.rs | 45 +++- .../src/types/canonical_extensions/json.rs | 78 ++++++ .../src/types/canonical_extensions/mod.rs | 18 +- .../src/types/canonical_extensions/opaque.rs | 78 ++++++ .../src/types/canonical_extensions/tensor.rs | 133 ++++++++++ .../timestamp_with_offset.rs | 227 ++++++++++++++++++ .../src/types/canonical_extensions/uuid.rs | 58 ++++- datafusion/common/src/types/extension.rs | 9 + datafusion/common/src/types/mod.rs | 1 + datafusion/expr/src/registry.rs | 31 ++- 10 files changed, 657 insertions(+), 21 deletions(-) create mode 100644 datafusion/common/src/types/canonical_extensions/json.rs create mode 100644 datafusion/common/src/types/canonical_extensions/opaque.rs create mode 100644 datafusion/common/src/types/canonical_extensions/tensor.rs create mode 100644 datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs diff --git a/datafusion/common/src/types/canonical_extensions/bool8.rs b/datafusion/common/src/types/canonical_extensions/bool8.rs index d5aa973b08085..448f418d20eeb 100644 --- a/datafusion/common/src/types/canonical_extensions/bool8.rs +++ b/datafusion/common/src/types/canonical_extensions/bool8.rs @@ -20,18 +20,55 @@ use crate::types::extension::DFExtensionType; use arrow::array::{Array, Int8Array}; use arrow::datatypes::DataType; use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; +use arrow_schema::ArrowError; +use arrow_schema::extension::{Bool8, ExtensionType}; use std::fmt::Write; -/// Defines the extension type logic for the canonical `arrow.uuid` extension type. +/// Defines the extension type logic for the canonical `arrow.bool8` extension type. /// /// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. -impl DFExtensionType for arrow_schema::extension::Bool8 { +#[derive(Debug, Clone)] +pub struct DFBool8(Bool8); + +impl ExtensionType for DFBool8 { + const NAME: &'static str = Bool8::NAME; + type Metadata = ::Metadata; + + fn metadata(&self) -> &Self::Metadata { + self.0.metadata() + } + + fn serialize_metadata(&self) -> Option { + self.0.serialize_metadata() + } + + fn deserialize_metadata( + metadata: Option<&str>, + ) -> Result { + Bool8::deserialize_metadata(metadata) + } + + fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { + self.0.supports_data_type(data_type) + } + + fn try_new( + data_type: &DataType, + metadata: Self::Metadata, + ) -> Result { + Ok(Self(::try_new( + data_type, metadata, + )?)) + } +} + +impl DFExtensionType for DFBool8 { fn storage_type(&self) -> DataType { DataType::Int8 } fn serialize_metadata(&self) -> Option { - None + self.0.serialize_metadata() } fn create_array_formatter<'fmt>( @@ -82,7 +119,7 @@ mod tests { pub fn test_pretty_bool8() { let values = Int8Array::from_iter([Some(0), Some(1), Some(-20), None]); - let extension_type = arrow_schema::extension::Bool8 {}; + let extension_type = DFBool8(Bool8 {}); let formatter = extension_type .create_array_formatter(&values, &FormatOptions::default().with_null("NULL")) .unwrap() diff --git a/datafusion/common/src/types/canonical_extensions/json.rs b/datafusion/common/src/types/canonical_extensions/json.rs new file mode 100644 index 0000000000000..0d26a8891ad2c --- /dev/null +++ b/datafusion/common/src/types/canonical_extensions/json.rs @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. See the NOTICE file distributed with this +// work for additional information regarding copyright ownership. +// The ASF licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain a +// copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::types::extension::DFExtensionType; +use arrow::datatypes::DataType; +use arrow_schema::ArrowError; +use arrow_schema::extension::{ExtensionType, Json}; + +/// Defines the extension type logic for the canonical `arrow.json` extension type. +/// +/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. +#[derive(Debug, Clone)] +pub struct DFJson { + inner: Json, + storage_type: DataType, +} + +impl ExtensionType for DFJson { + const NAME: &'static str = Json::NAME; + type Metadata = ::Metadata; + + fn metadata(&self) -> &Self::Metadata { + self.inner.metadata() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } + + fn deserialize_metadata( + metadata: Option<&str>, + ) -> Result { + Json::deserialize_metadata(metadata) + } + + fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { + self.inner.supports_data_type(data_type) + } + + fn try_new( + data_type: &DataType, + metadata: Self::Metadata, + ) -> Result { + Ok(Self { + inner: ::try_new(data_type, metadata)?, + storage_type: data_type.clone(), + }) + } +} + +impl DFExtensionType for DFJson { + fn storage_type(&self) -> DataType { + self.storage_type.clone() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } +} diff --git a/datafusion/common/src/types/canonical_extensions/mod.rs b/datafusion/common/src/types/canonical_extensions/mod.rs index d9be595074541..4e5045c746c4b 100644 --- a/datafusion/common/src/types/canonical_extensions/mod.rs +++ b/datafusion/common/src/types/canonical_extensions/mod.rs @@ -4,7 +4,12 @@ // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at +// with the License. See the NOTICE file distributed with this +// work for additional information regarding copyright ownership. +// The ASF licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain a +// copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // @@ -16,4 +21,15 @@ // under the License. mod bool8; +mod json; +mod opaque; +mod tensor; +mod timestamp_with_offset; mod uuid; + +pub use bool8::DFBool8; +pub use json::DFJson; +pub use opaque::DFOpaque; +pub use tensor::{DFFixedShapeTensor, DFVariableShapeTensor}; +pub use timestamp_with_offset::DFTimestampWithOffset; +pub use uuid::DFUuid; diff --git a/datafusion/common/src/types/canonical_extensions/opaque.rs b/datafusion/common/src/types/canonical_extensions/opaque.rs new file mode 100644 index 0000000000000..c55bbc4deeb49 --- /dev/null +++ b/datafusion/common/src/types/canonical_extensions/opaque.rs @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. See the NOTICE file distributed with this +// work for additional information regarding copyright ownership. +// The ASF licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain a +// copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::types::extension::DFExtensionType; +use arrow::datatypes::DataType; +use arrow_schema::ArrowError; +use arrow_schema::extension::{ExtensionType, Opaque}; + +/// Defines the extension type logic for the canonical `arrow.opaque` extension type. +/// +/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. +#[derive(Debug, Clone)] +pub struct DFOpaque { + inner: Opaque, + storage_type: DataType, +} + +impl ExtensionType for DFOpaque { + const NAME: &'static str = Opaque::NAME; + type Metadata = ::Metadata; + + fn metadata(&self) -> &Self::Metadata { + self.inner.metadata() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } + + fn deserialize_metadata( + metadata: Option<&str>, + ) -> Result { + Opaque::deserialize_metadata(metadata) + } + + fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { + self.inner.supports_data_type(data_type) + } + + fn try_new( + data_type: &DataType, + metadata: Self::Metadata, + ) -> Result { + Ok(Self { + inner: ::try_new(data_type, metadata)?, + storage_type: data_type.clone(), + }) + } +} + +impl DFExtensionType for DFOpaque { + fn storage_type(&self) -> DataType { + self.storage_type.clone() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } +} diff --git a/datafusion/common/src/types/canonical_extensions/tensor.rs b/datafusion/common/src/types/canonical_extensions/tensor.rs new file mode 100644 index 0000000000000..4dd53c8f498bb --- /dev/null +++ b/datafusion/common/src/types/canonical_extensions/tensor.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::types::extension::DFExtensionType; +use arrow::datatypes::DataType; +use arrow_schema::ArrowError; +use arrow_schema::extension::{ExtensionType, FixedShapeTensor, VariableShapeTensor}; + +/// Defines the extension type logic for the canonical `arrow.fixed_shape_tensor` extension type. +/// +/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. +#[derive(Debug, Clone)] +pub struct DFFixedShapeTensor { + inner: FixedShapeTensor, + /// The storage type of the tensor. + /// + /// While we could reconstruct the storage type from the inner [`FixedShapeTensor`], we may + /// choose a different name for the field within the [`DataType::FixedSizeList`] which can + /// cause problems down the line (e.g., checking for equality). + storage_type: DataType, +} + +impl ExtensionType for DFFixedShapeTensor { + const NAME: &'static str = FixedShapeTensor::NAME; + type Metadata = ::Metadata; + + fn metadata(&self) -> &Self::Metadata { + self.inner.metadata() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } + + fn deserialize_metadata( + metadata: Option<&str>, + ) -> Result { + FixedShapeTensor::deserialize_metadata(metadata) + } + + fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { + self.inner.supports_data_type(data_type) + } + + fn try_new( + data_type: &DataType, + metadata: Self::Metadata, + ) -> Result { + Ok(Self { + inner: ::try_new(data_type, metadata)?, + storage_type: data_type.clone(), + }) + } +} + +impl DFExtensionType for DFFixedShapeTensor { + fn storage_type(&self) -> DataType { + self.storage_type.clone() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } +} + +/// Defines the extension type logic for the canonical `arrow.variable_shape_tensor` extension type. +/// +/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. +#[derive(Debug, Clone)] +pub struct DFVariableShapeTensor { + inner: VariableShapeTensor, + /// While we could reconstruct the storage type from the inner [`VariableShapeTensor`], we may + /// choose a different name for the field within the [`DataType::List`] which can cause problems + /// down the line (e.g., checking for equality). + storage_type: DataType, +} + +impl ExtensionType for DFVariableShapeTensor { + const NAME: &'static str = VariableShapeTensor::NAME; + type Metadata = ::Metadata; + + fn metadata(&self) -> &Self::Metadata { + self.inner.metadata() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } + + fn deserialize_metadata( + metadata: Option<&str>, + ) -> Result { + VariableShapeTensor::deserialize_metadata(metadata) + } + + fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { + self.inner.supports_data_type(data_type) + } + + fn try_new( + data_type: &DataType, + metadata: Self::Metadata, + ) -> Result { + Ok(Self { + inner: ::try_new(data_type, metadata)?, + storage_type: data_type.clone(), + }) + } +} + +impl DFExtensionType for DFVariableShapeTensor { + fn storage_type(&self) -> DataType { + self.storage_type.clone() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } +} diff --git a/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs new file mode 100644 index 0000000000000..3dfca2cf98e4e --- /dev/null +++ b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. See the NOTICE file distributed with this +// work for additional information regarding copyright ownership. +// The ASF licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file +// except in compliance with the License. You may obtain a +// copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::ScalarValue; +use crate::error::_internal_err; +use crate::types::extension::DFExtensionType; +use arrow::array::{Array, AsArray, Int16Array}; +use arrow::datatypes::{ + DataType, Int16Type, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, + TimestampNanosecondType, TimestampSecondType, +}; +use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; +use arrow_schema::ArrowError; +use arrow_schema::extension::{ExtensionType, TimestampWithOffset}; +use std::fmt::Write; + +/// Defines the extension type logic for the canonical `arrow.timestamp_with_offset` extension type. +/// +/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. +#[derive(Debug, Clone)] +pub struct DFTimestampWithOffset { + inner: TimestampWithOffset, + storage_type: DataType, +} + +impl ExtensionType for DFTimestampWithOffset { + const NAME: &'static str = TimestampWithOffset::NAME; + type Metadata = ::Metadata; + + fn metadata(&self) -> &Self::Metadata { + self.inner.metadata() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } + + fn deserialize_metadata( + metadata: Option<&str>, + ) -> Result { + TimestampWithOffset::deserialize_metadata(metadata) + } + + fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { + self.inner.supports_data_type(data_type) + } + + fn try_new( + data_type: &DataType, + metadata: Self::Metadata, + ) -> Result { + Ok(Self { + inner: ::try_new(data_type, metadata)?, + storage_type: data_type.clone(), + }) + } +} + +impl DFExtensionType for DFTimestampWithOffset { + fn storage_type(&self) -> DataType { + self.storage_type.clone() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } + + fn create_array_formatter<'fmt>( + &self, + array: &'fmt dyn Array, + options: &FormatOptions<'fmt>, + ) -> crate::Result>> { + if array.data_type() != &self.storage_type { + return _internal_err!( + "Unexpected data type for TimestampWithOffset: {}", + array.data_type() + ); + } + + let struct_array = array.as_struct(); + let timestamp_array = struct_array + .column_by_name("timestamp") + .expect("Type checked above") + .as_ref(); + let offset_array = struct_array + .column_by_name("offset_minutes") + .expect("Type checked above") + .as_primitive::(); + + let display_index = TimestampWithOffsetDisplayIndex { + timestamp_array, + offset_array, + options: options.clone(), + }; + + Ok(Some(ArrayFormatter::new( + Box::new(display_index), + options.safe(), + ))) + } +} + +struct TimestampWithOffsetDisplayIndex<'a> { + timestamp_array: &'a dyn Array, + offset_array: &'a Int16Array, + options: FormatOptions<'a>, +} + +impl DisplayIndex for TimestampWithOffsetDisplayIndex<'_> { + fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult { + let offset_minutes = self.offset_array.value(idx); + let offset = format_offset(offset_minutes); + + // The timestamp array must be UTC, so we can ignore the timezone. + let scalar = match self.timestamp_array.data_type() { + DataType::Timestamp(TimeUnit::Second, _) => { + let ts = self + .timestamp_array + .as_primitive::() + .value(idx); + ScalarValue::TimestampSecond(Some(ts), Some(offset.into())) + } + DataType::Timestamp(TimeUnit::Millisecond, _) => { + let ts = self + .timestamp_array + .as_primitive::() + .value(idx); + ScalarValue::TimestampMillisecond(Some(ts), Some(offset.into())) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + let ts = self + .timestamp_array + .as_primitive::() + .value(idx); + ScalarValue::TimestampMicrosecond(Some(ts), Some(offset.into())) + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + let ts = self + .timestamp_array + .as_primitive::() + .value(idx); + ScalarValue::TimestampNanosecond(Some(ts), Some(offset.into())) + } + _ => unreachable!("TimestampWithOffset storage must be a Timestamp array"), + }; + + let array = scalar.to_array().map_err(|_| { + ArrowError::ComputeError("Failed to convert scalar to array".to_owned()) + })?; + let formatter = ArrayFormatter::try_new(&array, &self.options)?; + formatter.value(0).write(f)?; + + Ok(()) + } +} + +/// Formats the offset in the format `+/-HH:MM`, which can be used as an offset in the regular +/// timestamp types. +fn format_offset(minutes: i16) -> String { + let sign = if minutes >= 0 { '+' } else { '-' }; + let minutes = minutes.abs(); + let hours = minutes / 60; + let minutes = minutes % 60; + format!("{sign}{hours:02}:{minutes:02}") +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{Field, Fields}; + use chrono::{TimeZone, Utc}; + + #[test] + fn test_pretty_print_timestamp_with_offset() -> Result<(), ArrowError> { + let fields = create_fields(TimeUnit::Second); + + let ts = Utc + .with_ymd_and_hms(2024, 4, 1, 0, 0, 0) + .unwrap() + .timestamp(); + + let timestamp_array = + Arc::new(TimestampSecondArray::from(vec![ts, ts]).with_timezone("UTC")); + let offset_array = Arc::new(Int16Array::from(vec![60, -105])); + + let struct_array = + StructArray::try_new(fields, vec![timestamp_array, offset_array], None)?; + + let formatter = DFTimestampWithOffset::try_new(struct_array.data_type(), ())? + .create_array_formatter(&struct_array, &FormatOptions::default())? + .unwrap(); + + assert_eq!(formatter.value(0).to_string(), "2024-04-01T01:00:00+01:00"); + assert_eq!(formatter.value(1).to_string(), "2024-03-31T22:15:00-01:45"); + + Ok(()) + } + + fn create_fields(time_unit: TimeUnit) -> Fields { + let ts_field = Field::new( + "timestamp", + DataType::Timestamp(time_unit, Some("UTC".into())), + false, + ); + let offset_field = Field::new("offset_minutes", DataType::Int16, false); + Fields::from(vec![ts_field, offset_field]) + } +} diff --git a/datafusion/common/src/types/canonical_extensions/uuid.rs b/datafusion/common/src/types/canonical_extensions/uuid.rs index a4011e5827f8b..8647d612125f6 100644 --- a/datafusion/common/src/types/canonical_extensions/uuid.rs +++ b/datafusion/common/src/types/canonical_extensions/uuid.rs @@ -20,19 +20,54 @@ use crate::types::extension::DFExtensionType; use arrow::array::{Array, FixedSizeBinaryArray}; use arrow::datatypes::DataType; use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; +use arrow_schema::ArrowError; +use arrow_schema::extension::{ExtensionType, Uuid}; use std::fmt::Write; -use uuid::{Bytes, Uuid}; +use uuid::{Bytes, Uuid as UuidImpl}; /// Defines the extension type logic for the canonical `arrow.uuid` extension type. /// /// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. -impl DFExtensionType for arrow_schema::extension::Uuid { +#[derive(Debug, Clone)] +pub struct DFUuid(Uuid); + +impl ExtensionType for DFUuid { + const NAME: &'static str = Uuid::NAME; + type Metadata = ::Metadata; + + fn metadata(&self) -> &Self::Metadata { + self.0.metadata() + } + + fn serialize_metadata(&self) -> Option { + self.0.serialize_metadata() + } + + fn deserialize_metadata( + metadata: Option<&str>, + ) -> Result { + Uuid::deserialize_metadata(metadata) + } + + fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { + self.0.supports_data_type(data_type) + } + + fn try_new( + data_type: &DataType, + metadata: Self::Metadata, + ) -> Result { + Ok(Self(::try_new(data_type, metadata)?)) + } +} + +impl DFExtensionType for DFUuid { fn storage_type(&self) -> DataType { DataType::FixedSizeBinary(16) } fn serialize_metadata(&self) -> Option { - None + self.0.serialize_metadata() } fn create_array_formatter<'fmt>( @@ -71,7 +106,7 @@ impl DisplayIndex for UuidValueDisplayIndex<'_> { let bytes = Bytes::try_from(self.array.value(idx)) .expect("FixedSizeBinaryArray length checked in create_array_formatter"); - let uuid = Uuid::from_bytes(bytes); + let uuid = UuidImpl::from_bytes(bytes); write!(f, "{uuid}")?; Ok(()) } @@ -83,21 +118,20 @@ mod tests { use crate::ScalarValue; #[test] - pub fn test_pretty_print_uuid() { - let my_uuid = Uuid::nil(); + pub fn test_pretty_print_uuid() -> Result<(), ArrowError> { + let my_uuid = uuid::Uuid::nil(); let uuid = ScalarValue::FixedSizeBinary(16, Some(my_uuid.as_bytes().to_vec())) - .to_array_of_size(1) - .unwrap(); + .to_array_of_size(1)?; - let extension_type = arrow_schema::extension::Uuid {}; - let formatter = extension_type - .create_array_formatter(uuid.as_ref(), &FormatOptions::default()) - .unwrap() + let formatter = DFUuid::try_new(uuid.data_type(), ())? + .create_array_formatter(uuid.as_ref(), &FormatOptions::default())? .unwrap(); assert_eq!( formatter.value(0).to_string(), "00000000-0000-0000-0000-000000000000" ); + + Ok(()) } } diff --git a/datafusion/common/src/types/extension.rs b/datafusion/common/src/types/extension.rs index c05f8f19f18a9..ec3915abb063d 100644 --- a/datafusion/common/src/types/extension.rs +++ b/datafusion/common/src/types/extension.rs @@ -55,6 +55,15 @@ pub type DFExtensionTypeRef = Arc; /// Furthermore, the current trait in arrow-rs is not dyn-compatible, which we need for implementing /// extension type registries. In the future, the two implementations may increasingly converge. /// +/// Another difference is that [`DFExtensionType`] represents a fully resolved extension type that +/// has a fixed storage type (i.e., [`DataType`]). This is different from arrow-rs, which does not +/// have a fixed storage type. For example, while the DataFusion and arrow-rs JSON types share the +/// same metadata, an instance of DataFusion's extension type chooses one of the three possible +/// storage types: [`DataType::Utf8`], [`DataType::LargeUtf8`], or [`DataType::Utf8View`]. +/// This fixed storaga type is returned in [`DFExtensionType::storage_type`]. This is also the +/// reason why we have different structs in DataFusion (e.g., [`DFJson`](crate::types::DFJson) +/// instead of [`Json`](arrow_schema::extension::Json)). +/// /// # Examples /// /// Examples for using the extension type machinery can be found in the DataFusion examples diff --git a/datafusion/common/src/types/mod.rs b/datafusion/common/src/types/mod.rs index 82455063bc6ce..57bf921a6d564 100644 --- a/datafusion/common/src/types/mod.rs +++ b/datafusion/common/src/types/mod.rs @@ -23,6 +23,7 @@ mod logical; mod native; pub use builtin::*; +pub use canonical_extensions::*; pub use extension::*; pub use field::*; pub use logical::*; diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index 52a29019bc96b..e6cfea4ce5868 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -23,7 +23,10 @@ use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF}; use arrow::datatypes::Field; use arrow_schema::DataType; use arrow_schema::extension::ExtensionType; -use datafusion_common::types::{DFExtensionType, DFExtensionTypeRef}; +use datafusion_common::types::{ + DFBool8, DFExtensionType, DFExtensionTypeRef, DFFixedShapeTensor, DFJson, DFOpaque, + DFTimestampWithOffset, DFUuid, DFVariableShapeTensor, +}; use datafusion_common::{HashMap, Result, not_impl_err, plan_datafusion_err}; use std::collections::HashSet; use std::fmt::{Debug, Formatter}; @@ -424,9 +427,29 @@ impl MemoryExtensionTypeRegistry { /// Pre-registers the [canonical extension types](https://arrow.apache.org/docs/format/CanonicalExtensions.html) /// in the extension type registry. pub fn new_with_canonical_extension_types() -> Self { - let mapping = [DefaultExtensionTypeRegistration::new_arc(|_, _| { - Ok(arrow_schema::extension::Uuid {}) - })]; + let mapping = [ + DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { + Ok(DFFixedShapeTensor::try_new(storage_type, metadata)?) + }), + DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { + Ok(DFVariableShapeTensor::try_new(storage_type, metadata)?) + }), + DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { + Ok(DFJson::try_new(storage_type, metadata)?) + }), + DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { + Ok(DFUuid::try_new(storage_type, metadata)?) + }), + DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { + Ok(DFOpaque::try_new(storage_type, metadata)?) + }), + DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { + Ok(DFBool8::try_new(storage_type, metadata)?) + }), + DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { + Ok(DFTimestampWithOffset::try_new(storage_type, metadata)?) + }), + ]; let mut extension_types = HashMap::new(); for registration in mapping.into_iter() { From 754a8d7e853750636eafebd789bfff3c345068eb Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Wed, 1 Apr 2026 10:08:02 +0200 Subject: [PATCH 03/13] Split tensor extension types into multiple files --- .../{tensor.rs => fixed_shape_tensor.rs} | 57 +------------- .../src/types/canonical_extensions/mod.rs | 6 +- .../timestamp_with_offset.rs | 2 + .../variable_shape_tensor.rs | 76 +++++++++++++++++++ 4 files changed, 83 insertions(+), 58 deletions(-) rename datafusion/common/src/types/canonical_extensions/{tensor.rs => fixed_shape_tensor.rs} (59%) create mode 100644 datafusion/common/src/types/canonical_extensions/variable_shape_tensor.rs diff --git a/datafusion/common/src/types/canonical_extensions/tensor.rs b/datafusion/common/src/types/canonical_extensions/fixed_shape_tensor.rs similarity index 59% rename from datafusion/common/src/types/canonical_extensions/tensor.rs rename to datafusion/common/src/types/canonical_extensions/fixed_shape_tensor.rs index 4dd53c8f498bb..89f5a3e21fc98 100644 --- a/datafusion/common/src/types/canonical_extensions/tensor.rs +++ b/datafusion/common/src/types/canonical_extensions/fixed_shape_tensor.rs @@ -18,7 +18,7 @@ use crate::types::extension::DFExtensionType; use arrow::datatypes::DataType; use arrow_schema::ArrowError; -use arrow_schema::extension::{ExtensionType, FixedShapeTensor, VariableShapeTensor}; +use arrow_schema::extension::{ExtensionType, FixedShapeTensor}; /// Defines the extension type logic for the canonical `arrow.fixed_shape_tensor` extension type. /// @@ -76,58 +76,3 @@ impl DFExtensionType for DFFixedShapeTensor { self.inner.serialize_metadata() } } - -/// Defines the extension type logic for the canonical `arrow.variable_shape_tensor` extension type. -/// -/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. -#[derive(Debug, Clone)] -pub struct DFVariableShapeTensor { - inner: VariableShapeTensor, - /// While we could reconstruct the storage type from the inner [`VariableShapeTensor`], we may - /// choose a different name for the field within the [`DataType::List`] which can cause problems - /// down the line (e.g., checking for equality). - storage_type: DataType, -} - -impl ExtensionType for DFVariableShapeTensor { - const NAME: &'static str = VariableShapeTensor::NAME; - type Metadata = ::Metadata; - - fn metadata(&self) -> &Self::Metadata { - self.inner.metadata() - } - - fn serialize_metadata(&self) -> Option { - self.inner.serialize_metadata() - } - - fn deserialize_metadata( - metadata: Option<&str>, - ) -> Result { - VariableShapeTensor::deserialize_metadata(metadata) - } - - fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { - self.inner.supports_data_type(data_type) - } - - fn try_new( - data_type: &DataType, - metadata: Self::Metadata, - ) -> Result { - Ok(Self { - inner: ::try_new(data_type, metadata)?, - storage_type: data_type.clone(), - }) - } -} - -impl DFExtensionType for DFVariableShapeTensor { - fn storage_type(&self) -> DataType { - self.storage_type.clone() - } - - fn serialize_metadata(&self) -> Option { - self.inner.serialize_metadata() - } -} diff --git a/datafusion/common/src/types/canonical_extensions/mod.rs b/datafusion/common/src/types/canonical_extensions/mod.rs index 4e5045c746c4b..3744245d8a82a 100644 --- a/datafusion/common/src/types/canonical_extensions/mod.rs +++ b/datafusion/common/src/types/canonical_extensions/mod.rs @@ -21,15 +21,17 @@ // under the License. mod bool8; +mod fixed_shape_tensor; mod json; mod opaque; -mod tensor; mod timestamp_with_offset; mod uuid; +mod variable_shape_tensor; pub use bool8::DFBool8; +pub use fixed_shape_tensor::DFFixedShapeTensor; pub use json::DFJson; pub use opaque::DFOpaque; -pub use tensor::{DFFixedShapeTensor, DFVariableShapeTensor}; pub use timestamp_with_offset::DFTimestampWithOffset; pub use uuid::DFUuid; +pub use variable_shape_tensor::DFVariableShapeTensor; diff --git a/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs index 3dfca2cf98e4e..211cac7e098ec 100644 --- a/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs +++ b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs @@ -186,8 +186,10 @@ fn format_offset(minutes: i16) -> String { #[cfg(test)] mod tests { use super::*; + use arrow::array::{StructArray, TimestampSecondArray}; use arrow::datatypes::{Field, Fields}; use chrono::{TimeZone, Utc}; + use std::sync::Arc; #[test] fn test_pretty_print_timestamp_with_offset() -> Result<(), ArrowError> { diff --git a/datafusion/common/src/types/canonical_extensions/variable_shape_tensor.rs b/datafusion/common/src/types/canonical_extensions/variable_shape_tensor.rs new file mode 100644 index 0000000000000..0e8b7a45ec609 --- /dev/null +++ b/datafusion/common/src/types/canonical_extensions/variable_shape_tensor.rs @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::types::extension::DFExtensionType; +use arrow::datatypes::DataType; +use arrow_schema::ArrowError; +use arrow_schema::extension::{ExtensionType, VariableShapeTensor}; + +/// Defines the extension type logic for the canonical `arrow.variable_shape_tensor` extension type. +/// +/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. +#[derive(Debug, Clone)] +pub struct DFVariableShapeTensor { + inner: VariableShapeTensor, + /// While we could reconstruct the storage type from the inner [`VariableShapeTensor`], we may + /// choose a different name for the field within the [`DataType::List`] which can cause problems + /// down the line (e.g., checking for equality). + storage_type: DataType, +} + +impl ExtensionType for DFVariableShapeTensor { + const NAME: &'static str = VariableShapeTensor::NAME; + type Metadata = ::Metadata; + + fn metadata(&self) -> &Self::Metadata { + self.inner.metadata() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } + + fn deserialize_metadata( + metadata: Option<&str>, + ) -> Result { + VariableShapeTensor::deserialize_metadata(metadata) + } + + fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { + self.inner.supports_data_type(data_type) + } + + fn try_new( + data_type: &DataType, + metadata: Self::Metadata, + ) -> Result { + Ok(Self { + inner: ::try_new(data_type, metadata)?, + storage_type: data_type.clone(), + }) + } +} + +impl DFExtensionType for DFVariableShapeTensor { + fn storage_type(&self) -> DataType { + self.storage_type.clone() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } +} From 78e9404e1d7078705c0b84040ba353d99661bd64 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Wed, 1 Apr 2026 10:21:07 +0200 Subject: [PATCH 04/13] Fix license headers --- datafusion/common/src/types/canonical_extensions/json.rs | 7 +------ datafusion/common/src/types/canonical_extensions/mod.rs | 7 +------ datafusion/common/src/types/canonical_extensions/opaque.rs | 7 +------ .../types/canonical_extensions/timestamp_with_offset.rs | 7 +------ 4 files changed, 4 insertions(+), 24 deletions(-) diff --git a/datafusion/common/src/types/canonical_extensions/json.rs b/datafusion/common/src/types/canonical_extensions/json.rs index 0d26a8891ad2c..3a4028c4c0cac 100644 --- a/datafusion/common/src/types/canonical_extensions/json.rs +++ b/datafusion/common/src/types/canonical_extensions/json.rs @@ -4,12 +4,7 @@ // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance -// with the License. See the NOTICE file distributed with this -// work for additional information regarding copyright ownership. -// The ASF licenses this file to you under the Apache License, -// Version 2.0 (the "License"); you may not use this file -// except in compliance with the License. You may obtain a -// copy of the License at +// with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // diff --git a/datafusion/common/src/types/canonical_extensions/mod.rs b/datafusion/common/src/types/canonical_extensions/mod.rs index 3744245d8a82a..2d74d0669d213 100644 --- a/datafusion/common/src/types/canonical_extensions/mod.rs +++ b/datafusion/common/src/types/canonical_extensions/mod.rs @@ -4,12 +4,7 @@ // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance -// with the License. See the NOTICE file distributed with this -// work for additional information regarding copyright ownership. -// The ASF licenses this file to you under the Apache License, -// Version 2.0 (the "License"); you may not use this file -// except in compliance with the License. You may obtain a -// copy of the License at +// with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // diff --git a/datafusion/common/src/types/canonical_extensions/opaque.rs b/datafusion/common/src/types/canonical_extensions/opaque.rs index c55bbc4deeb49..5710612b51cea 100644 --- a/datafusion/common/src/types/canonical_extensions/opaque.rs +++ b/datafusion/common/src/types/canonical_extensions/opaque.rs @@ -4,12 +4,7 @@ // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance -// with the License. See the NOTICE file distributed with this -// work for additional information regarding copyright ownership. -// The ASF licenses this file to you under the Apache License, -// Version 2.0 (the "License"); you may not use this file -// except in compliance with the License. You may obtain a -// copy of the License at +// with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // diff --git a/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs index 211cac7e098ec..bd05149b4d939 100644 --- a/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs +++ b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs @@ -4,12 +4,7 @@ // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance -// with the License. See the NOTICE file distributed with this -// work for additional information regarding copyright ownership. -// The ASF licenses this file to you under the Apache License, -// Version 2.0 (the "License"); you may not use this file -// except in compliance with the License. You may obtain a -// copy of the License at +// with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // From 6a425643e91fdc75bb5c4b9e967b78556029fd0e Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Wed, 1 Apr 2026 10:26:58 +0200 Subject: [PATCH 05/13] Add null support for TimestampWithOffset display --- .../timestamp_with_offset.rs | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs index bd05149b4d939..fd8b657fb4d29 100644 --- a/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs +++ b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs @@ -19,6 +19,7 @@ use crate::ScalarValue; use crate::error::_internal_err; use crate::types::extension::DFExtensionType; use arrow::array::{Array, AsArray, Int16Array}; +use arrow::buffer::NullBuffer; use arrow::datatypes::{ DataType, Int16Type, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, @@ -102,6 +103,7 @@ impl DFExtensionType for DFTimestampWithOffset { .as_primitive::(); let display_index = TimestampWithOffsetDisplayIndex { + null_buffer: struct_array.nulls(), timestamp_array, offset_array, options: options.clone(), @@ -115,6 +117,9 @@ impl DFExtensionType for DFTimestampWithOffset { } struct TimestampWithOffsetDisplayIndex<'a> { + /// The inner arrays are always non-null. Use the null buffer of the struct array to check + /// whether an element is null. + null_buffer: Option<&'a NullBuffer>, timestamp_array: &'a dyn Array, offset_array: &'a Int16Array, options: FormatOptions<'a>, @@ -122,6 +127,11 @@ struct TimestampWithOffsetDisplayIndex<'a> { impl DisplayIndex for TimestampWithOffsetDisplayIndex<'_> { fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult { + if self.null_buffer.map(|nb| nb.is_null(idx)).unwrap_or(false) { + write!(f, "{}", self.options.null())?; + return Ok(()); + } + let offset_minutes = self.offset_array.value(idx); let offset = format_offset(offset_minutes); @@ -196,18 +206,25 @@ mod tests { .timestamp(); let timestamp_array = - Arc::new(TimestampSecondArray::from(vec![ts, ts]).with_timezone("UTC")); - let offset_array = Arc::new(Int16Array::from(vec![60, -105])); + Arc::new(TimestampSecondArray::from(vec![ts, ts, ts]).with_timezone("UTC")); + let offset_array = Arc::new(Int16Array::from(vec![60, -105, 0])); - let struct_array = - StructArray::try_new(fields, vec![timestamp_array, offset_array], None)?; + let struct_array = StructArray::try_new( + fields, + vec![timestamp_array, offset_array], + Some(NullBuffer::from(vec![true, true, false])), + )?; let formatter = DFTimestampWithOffset::try_new(struct_array.data_type(), ())? - .create_array_formatter(&struct_array, &FormatOptions::default())? + .create_array_formatter( + &struct_array, + &FormatOptions::default().with_null("NULL"), + )? .unwrap(); assert_eq!(formatter.value(0).to_string(), "2024-04-01T01:00:00+01:00"); assert_eq!(formatter.value(1).to_string(), "2024-03-31T22:15:00-01:45"); + assert_eq!(formatter.value(2).to_string(), "NULL"); Ok(()) } From d62be49f112948e1261b402f50e859eff64e0a4a Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Wed, 1 Apr 2026 10:30:59 +0200 Subject: [PATCH 06/13] Minor changes --- .../common/src/types/canonical_extensions/uuid.rs | 4 ++-- datafusion/common/src/types/extension.rs | 15 ++++++++------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/datafusion/common/src/types/canonical_extensions/uuid.rs b/datafusion/common/src/types/canonical_extensions/uuid.rs index 8647d612125f6..bb1762b6bfe16 100644 --- a/datafusion/common/src/types/canonical_extensions/uuid.rs +++ b/datafusion/common/src/types/canonical_extensions/uuid.rs @@ -23,7 +23,7 @@ use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatRe use arrow_schema::ArrowError; use arrow_schema::extension::{ExtensionType, Uuid}; use std::fmt::Write; -use uuid::{Bytes, Uuid as UuidImpl}; +use uuid::Bytes; /// Defines the extension type logic for the canonical `arrow.uuid` extension type. /// @@ -106,7 +106,7 @@ impl DisplayIndex for UuidValueDisplayIndex<'_> { let bytes = Bytes::try_from(self.array.value(idx)) .expect("FixedSizeBinaryArray length checked in create_array_formatter"); - let uuid = UuidImpl::from_bytes(bytes); + let uuid = uuid::Uuid::from_bytes(bytes); write!(f, "{uuid}")?; Ok(()) } diff --git a/datafusion/common/src/types/extension.rs b/datafusion/common/src/types/extension.rs index ec3915abb063d..3bcb533dbf9e6 100644 --- a/datafusion/common/src/types/extension.rs +++ b/datafusion/common/src/types/extension.rs @@ -56,13 +56,14 @@ pub type DFExtensionTypeRef = Arc; /// extension type registries. In the future, the two implementations may increasingly converge. /// /// Another difference is that [`DFExtensionType`] represents a fully resolved extension type that -/// has a fixed storage type (i.e., [`DataType`]). This is different from arrow-rs, which does not -/// have a fixed storage type. For example, while the DataFusion and arrow-rs JSON types share the -/// same metadata, an instance of DataFusion's extension type chooses one of the three possible -/// storage types: [`DataType::Utf8`], [`DataType::LargeUtf8`], or [`DataType::Utf8View`]. -/// This fixed storaga type is returned in [`DFExtensionType::storage_type`]. This is also the -/// reason why we have different structs in DataFusion (e.g., [`DFJson`](crate::types::DFJson) -/// instead of [`Json`](arrow_schema::extension::Json)). +/// has a fixed storage type (i.e., [`DataType`]). This is different from arrow-rs, which only +/// stores the extension type's metadata. For example, an instance of DataFusion's JSON extension +/// type fixes one of the three possible storage types: [`DataType::Utf8`], +/// [`DataType::LargeUtf8`], or [`DataType::Utf8View`]. This fixed storaga type is returned in +/// [`DFExtensionType::storage_type`]. This is not possible in arrow-rs' extension type instances. +/// This is the reason why we have different types in DataFusion that usually delegate the metadata +/// processing to the underlying arrow-rs extension type instance +/// (e.g., [`DFJson`](crate::types::DFJson) instead of [`Json`](arrow_schema::extension::Json)). /// /// # Examples /// From e902af26c594be5ba1729455fe823493978c156d Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Wed, 1 Apr 2026 18:41:25 +0200 Subject: [PATCH 07/13] Support further offset types --- .../timestamp_with_offset.rs | 135 ++++++++++++++---- 1 file changed, 106 insertions(+), 29 deletions(-) diff --git a/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs index fd8b657fb4d29..752e8b9c68a84 100644 --- a/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs +++ b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs @@ -15,18 +15,19 @@ // specific language governing permissions and limitations // under the License. -use crate::ScalarValue; use crate::error::_internal_err; use crate::types::extension::DFExtensionType; +use crate::ScalarValue; use arrow::array::{Array, AsArray, Int16Array}; use arrow::buffer::NullBuffer; +use arrow::compute::cast; use arrow::datatypes::{ - DataType, Int16Type, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, + DataType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, }; use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; -use arrow_schema::ArrowError; use arrow_schema::extension::{ExtensionType, TimestampWithOffset}; +use arrow_schema::ArrowError; use std::fmt::Write; /// Defines the extension type logic for the canonical `arrow.timestamp_with_offset` extension type. @@ -97,10 +98,14 @@ impl DFExtensionType for DFTimestampWithOffset { .column_by_name("timestamp") .expect("Type checked above") .as_ref(); - let offset_array = struct_array + let raw_offset_array = struct_array .column_by_name("offset_minutes") - .expect("Type checked above") - .as_primitive::(); + .expect("Type checked above"); + + // Get a regular [`Int16Array`], if the offset array is a dictionary or run-length encoded. + let offset_array = cast(&raw_offset_array, &DataType::Int16)? + .as_primitive() + .clone(); let display_index = TimestampWithOffsetDisplayIndex { null_buffer: struct_array.nulls(), @@ -121,7 +126,7 @@ struct TimestampWithOffsetDisplayIndex<'a> { /// whether an element is null. null_buffer: Option<&'a NullBuffer>, timestamp_array: &'a dyn Array, - offset_array: &'a Int16Array, + offset_array: Int16Array, options: FormatOptions<'a>, } @@ -191,51 +196,123 @@ fn format_offset(minutes: i16) -> String { #[cfg(test)] mod tests { use super::*; - use arrow::array::{StructArray, TimestampSecondArray}; - use arrow::datatypes::{Field, Fields}; + use arrow::array::{ + Array, DictionaryArray, Int16Array, Int32Array, RunArray, StructArray, + TimestampSecondArray, + }; + use arrow::buffer::NullBuffer; + use arrow::datatypes::{Field, Fields, Int32Type}; use chrono::{TimeZone, Utc}; use std::sync::Arc; #[test] fn test_pretty_print_timestamp_with_offset() -> Result<(), ArrowError> { - let fields = create_fields(TimeUnit::Second); - let ts = Utc .with_ymd_and_hms(2024, 4, 1, 0, 0, 0) .unwrap() .timestamp(); - let timestamp_array = - Arc::new(TimestampSecondArray::from(vec![ts, ts, ts]).with_timezone("UTC")); let offset_array = Arc::new(Int16Array::from(vec![60, -105, 0])); - let struct_array = StructArray::try_new( - fields, - vec![timestamp_array, offset_array], + run_formatting_test( + vec![ts, ts, ts], + offset_array, Some(NullBuffer::from(vec![true, true, false])), - )?; + FormatOptions::default().with_null("NULL"), + &[ + "2024-04-01T01:00:00+01:00", + "2024-03-31T22:15:00-01:45", + "NULL", + ], + ) + } - let formatter = DFTimestampWithOffset::try_new(struct_array.data_type(), ())? - .create_array_formatter( - &struct_array, - &FormatOptions::default().with_null("NULL"), - )? - .unwrap(); + #[test] + fn test_pretty_print_dictionary_offset() -> Result<(), ArrowError> { + let ts = Utc + .with_ymd_and_hms(2024, 4, 1, 12, 0, 0) + .unwrap() + .timestamp(); - assert_eq!(formatter.value(0).to_string(), "2024-04-01T01:00:00+01:00"); - assert_eq!(formatter.value(1).to_string(), "2024-03-31T22:15:00-01:45"); - assert_eq!(formatter.value(2).to_string(), "NULL"); + let offset_array = Arc::new(DictionaryArray::::new( + Int16Array::from(vec![0, 1, 0]), + Arc::new(Int16Array::from(vec![60, -60])), + )); - Ok(()) + run_formatting_test( + vec![ts, ts, ts], + offset_array, + None, + FormatOptions::default(), + &[ + "2024-04-01T13:00:00+01:00", + "2024-04-01T11:00:00-01:00", + "2024-04-01T13:00:00+01:00", + ], + ) } - fn create_fields(time_unit: TimeUnit) -> Fields { + #[test] + fn test_pretty_print_rle_offset() -> Result<(), ArrowError> { + let ts = Utc + .with_ymd_and_hms(2024, 4, 1, 12, 0, 0) + .unwrap() + .timestamp(); + + let run_ends = Int32Array::from(vec![2]); + let values = Int16Array::from(vec![120]); + let offset_array = Arc::new(RunArray::::try_new(&run_ends, &values)?); + + run_formatting_test( + vec![ts, ts], + offset_array, + None, + FormatOptions::default(), + &["2024-04-01T14:00:00+02:00", "2024-04-01T14:00:00+02:00"], + ) + } + + /// Create valid fields with flexible offset types + fn create_fields_custom_offset(time_unit: TimeUnit, offset_type: DataType) -> Fields { let ts_field = Field::new( "timestamp", DataType::Timestamp(time_unit, Some("UTC".into())), false, ); - let offset_field = Field::new("offset_minutes", DataType::Int16, false); + let offset_field = Field::new("offset_minutes", offset_type, false); Fields::from(vec![ts_field, offset_field]) } + + /// Helper to construct the arrays, run the formatter, and assert the expected strings. + fn run_formatting_test( + timestamps: Vec, + offset_array: Arc, + null_buffer: Option, + options: FormatOptions, + expected: &[&str], + ) -> Result<(), ArrowError> { + let fields = create_fields_custom_offset( + TimeUnit::Second, + offset_array.data_type().clone(), + ); + + let struct_array = StructArray::try_new( + fields, + vec![ + Arc::new(TimestampSecondArray::from(timestamps).with_timezone("UTC")), + offset_array, + ], + null_buffer, + )?; + + let formatter = DFTimestampWithOffset::try_new(struct_array.data_type(), ())? + .create_array_formatter(&struct_array, &options)? + .unwrap(); + + for (i, expected_str) in expected.iter().enumerate() { + assert_eq!(formatter.value(i).to_string(), *expected_str); + } + + Ok(()) + } } From 150b685c39b8fc359c07debe6044cadd4797a054 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Wed, 1 Apr 2026 19:01:23 +0200 Subject: [PATCH 08/13] Fix issue with use statement --- .../src/types/canonical_extensions/timestamp_with_offset.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs index 752e8b9c68a84..6df733a99dd2a 100644 --- a/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs +++ b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. +use crate::ScalarValue; use crate::error::_internal_err; use crate::types::extension::DFExtensionType; -use crate::ScalarValue; use arrow::array::{Array, AsArray, Int16Array}; use arrow::buffer::NullBuffer; use arrow::compute::cast; @@ -26,8 +26,8 @@ use arrow::datatypes::{ TimestampNanosecondType, TimestampSecondType, }; use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; -use arrow_schema::extension::{ExtensionType, TimestampWithOffset}; use arrow_schema::ArrowError; +use arrow_schema::extension::{ExtensionType, TimestampWithOffset}; use std::fmt::Write; /// Defines the extension type logic for the canonical `arrow.timestamp_with_offset` extension type. @@ -201,7 +201,7 @@ mod tests { TimestampSecondArray, }; use arrow::buffer::NullBuffer; - use arrow::datatypes::{Field, Fields, Int32Type}; + use arrow::datatypes::{Field, Fields, Int16Type, Int32Type}; use chrono::{TimeZone, Utc}; use std::sync::Arc; From 5cbf369691d3ec24eeedbb363fc5776aab6b0b38 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 2 Apr 2026 10:23:00 +0200 Subject: [PATCH 09/13] Remove necessity to implement arrow-rs' ExtensionType for DF extension types --- .../src/types/canonical_extensions/bool8.rs | 35 ++---- .../fixed_shape_tensor.rs | 33 ++---- .../src/types/canonical_extensions/json.rs | 33 ++---- .../src/types/canonical_extensions/opaque.rs | 33 ++---- .../timestamp_with_offset.rs | 34 ++---- .../src/types/canonical_extensions/uuid.rs | 36 ++---- .../variable_shape_tensor.rs | 33 ++---- datafusion/expr/src/registry.rs | 112 ++++++++++-------- 8 files changed, 116 insertions(+), 233 deletions(-) diff --git a/datafusion/common/src/types/canonical_extensions/bool8.rs b/datafusion/common/src/types/canonical_extensions/bool8.rs index 448f418d20eeb..9988bf01543be 100644 --- a/datafusion/common/src/types/canonical_extensions/bool8.rs +++ b/datafusion/common/src/types/canonical_extensions/bool8.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. +use crate::Result; use crate::error::_internal_err; use crate::types::extension::DFExtensionType; use arrow::array::{Array, Int8Array}; use arrow::datatypes::DataType; use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; -use arrow_schema::ArrowError; use arrow_schema::extension::{Bool8, ExtensionType}; use std::fmt::Write; @@ -30,32 +30,13 @@ use std::fmt::Write; #[derive(Debug, Clone)] pub struct DFBool8(Bool8); -impl ExtensionType for DFBool8 { - const NAME: &'static str = Bool8::NAME; - type Metadata = ::Metadata; - - fn metadata(&self) -> &Self::Metadata { - self.0.metadata() - } - - fn serialize_metadata(&self) -> Option { - self.0.serialize_metadata() - } - - fn deserialize_metadata( - metadata: Option<&str>, - ) -> Result { - Bool8::deserialize_metadata(metadata) - } - - fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { - self.0.supports_data_type(data_type) - } - - fn try_new( +impl DFBool8 { + /// Creates a new [`DFBool8`], validating that the storage type is compatible with the + /// extension type. + pub fn try_new( data_type: &DataType, - metadata: Self::Metadata, - ) -> Result { + metadata: ::Metadata, + ) -> Result { Ok(Self(::try_new( data_type, metadata, )?)) @@ -75,7 +56,7 @@ impl DFExtensionType for DFBool8 { &self, array: &'fmt dyn Array, options: &FormatOptions<'fmt>, - ) -> crate::Result>> { + ) -> Result>> { if array.data_type() != &DataType::Int8 { return _internal_err!("Wrong array type for Bool8"); } diff --git a/datafusion/common/src/types/canonical_extensions/fixed_shape_tensor.rs b/datafusion/common/src/types/canonical_extensions/fixed_shape_tensor.rs index 89f5a3e21fc98..7f9789d1a2a30 100644 --- a/datafusion/common/src/types/canonical_extensions/fixed_shape_tensor.rs +++ b/datafusion/common/src/types/canonical_extensions/fixed_shape_tensor.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. +use crate::Result; use crate::types::extension::DFExtensionType; use arrow::datatypes::DataType; -use arrow_schema::ArrowError; use arrow_schema::extension::{ExtensionType, FixedShapeTensor}; /// Defines the extension type logic for the canonical `arrow.fixed_shape_tensor` extension type. @@ -34,32 +34,13 @@ pub struct DFFixedShapeTensor { storage_type: DataType, } -impl ExtensionType for DFFixedShapeTensor { - const NAME: &'static str = FixedShapeTensor::NAME; - type Metadata = ::Metadata; - - fn metadata(&self) -> &Self::Metadata { - self.inner.metadata() - } - - fn serialize_metadata(&self) -> Option { - self.inner.serialize_metadata() - } - - fn deserialize_metadata( - metadata: Option<&str>, - ) -> Result { - FixedShapeTensor::deserialize_metadata(metadata) - } - - fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { - self.inner.supports_data_type(data_type) - } - - fn try_new( +impl DFFixedShapeTensor { + /// Creates a new [`DFFixedShapeTensor`], validating that the storage type is compatible with + /// the extension type. + pub fn try_new( data_type: &DataType, - metadata: Self::Metadata, - ) -> Result { + metadata: ::Metadata, + ) -> Result { Ok(Self { inner: ::try_new(data_type, metadata)?, storage_type: data_type.clone(), diff --git a/datafusion/common/src/types/canonical_extensions/json.rs b/datafusion/common/src/types/canonical_extensions/json.rs index 3a4028c4c0cac..381edaa66708b 100644 --- a/datafusion/common/src/types/canonical_extensions/json.rs +++ b/datafusion/common/src/types/canonical_extensions/json.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. +use crate::Result; use crate::types::extension::DFExtensionType; use arrow::datatypes::DataType; -use arrow_schema::ArrowError; use arrow_schema::extension::{ExtensionType, Json}; /// Defines the extension type logic for the canonical `arrow.json` extension type. @@ -29,32 +29,13 @@ pub struct DFJson { storage_type: DataType, } -impl ExtensionType for DFJson { - const NAME: &'static str = Json::NAME; - type Metadata = ::Metadata; - - fn metadata(&self) -> &Self::Metadata { - self.inner.metadata() - } - - fn serialize_metadata(&self) -> Option { - self.inner.serialize_metadata() - } - - fn deserialize_metadata( - metadata: Option<&str>, - ) -> Result { - Json::deserialize_metadata(metadata) - } - - fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { - self.inner.supports_data_type(data_type) - } - - fn try_new( +impl DFJson { + /// Creates a new [`DFJson`], validating that the storage type is compatible with the + /// extension type. + pub fn try_new( data_type: &DataType, - metadata: Self::Metadata, - ) -> Result { + metadata: ::Metadata, + ) -> Result { Ok(Self { inner: ::try_new(data_type, metadata)?, storage_type: data_type.clone(), diff --git a/datafusion/common/src/types/canonical_extensions/opaque.rs b/datafusion/common/src/types/canonical_extensions/opaque.rs index 5710612b51cea..fd41413419d1b 100644 --- a/datafusion/common/src/types/canonical_extensions/opaque.rs +++ b/datafusion/common/src/types/canonical_extensions/opaque.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. +use crate::Result; use crate::types::extension::DFExtensionType; use arrow::datatypes::DataType; -use arrow_schema::ArrowError; use arrow_schema::extension::{ExtensionType, Opaque}; /// Defines the extension type logic for the canonical `arrow.opaque` extension type. @@ -29,32 +29,13 @@ pub struct DFOpaque { storage_type: DataType, } -impl ExtensionType for DFOpaque { - const NAME: &'static str = Opaque::NAME; - type Metadata = ::Metadata; - - fn metadata(&self) -> &Self::Metadata { - self.inner.metadata() - } - - fn serialize_metadata(&self) -> Option { - self.inner.serialize_metadata() - } - - fn deserialize_metadata( - metadata: Option<&str>, - ) -> Result { - Opaque::deserialize_metadata(metadata) - } - - fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { - self.inner.supports_data_type(data_type) - } - - fn try_new( +impl DFOpaque { + /// Creates a new [`DFOpaque`], validating that the storage type is compatible with the + /// extension type. + pub fn try_new( data_type: &DataType, - metadata: Self::Metadata, - ) -> Result { + metadata: ::Metadata, + ) -> Result { Ok(Self { inner: ::try_new(data_type, metadata)?, storage_type: data_type.clone(), diff --git a/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs index 6df733a99dd2a..980741bb798cf 100644 --- a/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs +++ b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::Result; use crate::ScalarValue; use crate::error::_internal_err; use crate::types::extension::DFExtensionType; @@ -39,32 +40,13 @@ pub struct DFTimestampWithOffset { storage_type: DataType, } -impl ExtensionType for DFTimestampWithOffset { - const NAME: &'static str = TimestampWithOffset::NAME; - type Metadata = ::Metadata; - - fn metadata(&self) -> &Self::Metadata { - self.inner.metadata() - } - - fn serialize_metadata(&self) -> Option { - self.inner.serialize_metadata() - } - - fn deserialize_metadata( - metadata: Option<&str>, - ) -> Result { - TimestampWithOffset::deserialize_metadata(metadata) - } - - fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { - self.inner.supports_data_type(data_type) - } - - fn try_new( +impl DFTimestampWithOffset { + /// Creates a new [`DFTimestampWithOffset`], validating that the storage type is compatible with + /// the extension type. + pub fn try_new( data_type: &DataType, - metadata: Self::Metadata, - ) -> Result { + metadata: ::Metadata, + ) -> Result { Ok(Self { inner: ::try_new(data_type, metadata)?, storage_type: data_type.clone(), @@ -85,7 +67,7 @@ impl DFExtensionType for DFTimestampWithOffset { &self, array: &'fmt dyn Array, options: &FormatOptions<'fmt>, - ) -> crate::Result>> { + ) -> Result>> { if array.data_type() != &self.storage_type { return _internal_err!( "Unexpected data type for TimestampWithOffset: {}", diff --git a/datafusion/common/src/types/canonical_extensions/uuid.rs b/datafusion/common/src/types/canonical_extensions/uuid.rs index bb1762b6bfe16..c2b7a522345fa 100644 --- a/datafusion/common/src/types/canonical_extensions/uuid.rs +++ b/datafusion/common/src/types/canonical_extensions/uuid.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. +use crate::Result; use crate::error::_internal_err; use crate::types::extension::DFExtensionType; use arrow::array::{Array, FixedSizeBinaryArray}; use arrow::datatypes::DataType; use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; -use arrow_schema::ArrowError; use arrow_schema::extension::{ExtensionType, Uuid}; use std::fmt::Write; use uuid::Bytes; @@ -31,32 +31,13 @@ use uuid::Bytes; #[derive(Debug, Clone)] pub struct DFUuid(Uuid); -impl ExtensionType for DFUuid { - const NAME: &'static str = Uuid::NAME; - type Metadata = ::Metadata; - - fn metadata(&self) -> &Self::Metadata { - self.0.metadata() - } - - fn serialize_metadata(&self) -> Option { - self.0.serialize_metadata() - } - - fn deserialize_metadata( - metadata: Option<&str>, - ) -> Result { - Uuid::deserialize_metadata(metadata) - } - - fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { - self.0.supports_data_type(data_type) - } - - fn try_new( +impl DFUuid { + /// Creates a new [`DFUuid`], validating that the storage type is compatible with the + /// extension type. + pub fn try_new( data_type: &DataType, - metadata: Self::Metadata, - ) -> Result { + metadata: ::Metadata, + ) -> Result { Ok(Self(::try_new(data_type, metadata)?)) } } @@ -74,7 +55,7 @@ impl DFExtensionType for DFUuid { &self, array: &'fmt dyn Array, options: &FormatOptions<'fmt>, - ) -> crate::Result>> { + ) -> Result>> { if array.data_type() != &DataType::FixedSizeBinary(16) { return _internal_err!("Wrong array type for Uuid"); } @@ -116,6 +97,7 @@ impl DisplayIndex for UuidValueDisplayIndex<'_> { mod tests { use super::*; use crate::ScalarValue; + use arrow_schema::ArrowError; #[test] pub fn test_pretty_print_uuid() -> Result<(), ArrowError> { diff --git a/datafusion/common/src/types/canonical_extensions/variable_shape_tensor.rs b/datafusion/common/src/types/canonical_extensions/variable_shape_tensor.rs index 0e8b7a45ec609..19efc4ae38522 100644 --- a/datafusion/common/src/types/canonical_extensions/variable_shape_tensor.rs +++ b/datafusion/common/src/types/canonical_extensions/variable_shape_tensor.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. +use crate::Result; use crate::types::extension::DFExtensionType; use arrow::datatypes::DataType; -use arrow_schema::ArrowError; use arrow_schema::extension::{ExtensionType, VariableShapeTensor}; /// Defines the extension type logic for the canonical `arrow.variable_shape_tensor` extension type. @@ -32,32 +32,13 @@ pub struct DFVariableShapeTensor { storage_type: DataType, } -impl ExtensionType for DFVariableShapeTensor { - const NAME: &'static str = VariableShapeTensor::NAME; - type Metadata = ::Metadata; - - fn metadata(&self) -> &Self::Metadata { - self.inner.metadata() - } - - fn serialize_metadata(&self) -> Option { - self.inner.serialize_metadata() - } - - fn deserialize_metadata( - metadata: Option<&str>, - ) -> Result { - VariableShapeTensor::deserialize_metadata(metadata) - } - - fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { - self.inner.supports_data_type(data_type) - } - - fn try_new( +impl DFVariableShapeTensor { + /// Creates a new [`DFVariableShapeTensor`], validating that the storage type is compatible with + /// the extension type. + pub fn try_new( data_type: &DataType, - metadata: Self::Metadata, - ) -> Result { + metadata: ::Metadata, + ) -> Result { Ok(Self { inner: ::try_new(data_type, metadata)?, storage_type: data_type.clone(), diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index e6cfea4ce5868..fe7a2416aa98a 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -22,9 +22,12 @@ use crate::planner::ExprPlanner; use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF}; use arrow::datatypes::Field; use arrow_schema::DataType; -use arrow_schema::extension::ExtensionType; +use arrow_schema::extension::{ + Bool8, ExtensionType, FixedShapeTensor, Json, Opaque, TimestampWithOffset, Uuid, + VariableShapeTensor, +}; use datafusion_common::types::{ - DFBool8, DFExtensionType, DFExtensionTypeRef, DFFixedShapeTensor, DFJson, DFOpaque, + DFBool8, DFExtensionTypeRef, DFFixedShapeTensor, DFJson, DFOpaque, DFTimestampWithOffset, DFUuid, DFVariableShapeTensor, }; use datafusion_common::{HashMap, Result, not_impl_err, plan_datafusion_err}; @@ -331,30 +334,29 @@ pub trait ExtensionTypeRegistry: Debug + Send + Sync { /// A factory that creates instances of extension types from a storage [`DataType`] and the /// metadata. -pub type ExtensionTypeFactory = dyn Fn(&DataType, ::Metadata) -> Result +pub type ExtensionTypeFactory = dyn Fn( + &DataType, + ::Metadata, + ) -> Result + Send + Sync; /// A default implementation of [ExtensionTypeRegistration] that parses the metadata from the -/// given extension type and passes it to a constructor function. -pub struct DefaultExtensionTypeRegistration< - TExtensionType: ExtensionType + DFExtensionType + 'static, -> { +/// given extension type and passes it to a constructor function. The [`ExtensionType::NAME`] is +/// used for registering the extension type. +pub struct DefaultExtensionTypeRegistration { /// A function that creates an instance of [`DFExtensionTypeRef`] from the storage type and the /// metadata. factory: Box>, } -impl +impl DefaultExtensionTypeRegistration { - /// Creates a new registration for an extension type. - /// - /// The factory is not required to validate the storage [`DataType`], as the compatibility will - /// be checked by the registration using [`ExtensionType::supports_data_type`]. However, the - /// factory may still choose to do so. + /// Creates a new registration for an extension type. The factory is required to validate that + /// the storage [`DataType`] is compatible with the extension type. pub fn new_arc( - factory: impl Fn(&DataType, TExtensionType::Metadata) -> Result + factory: impl Fn(&DataType, TExtensionType::Metadata) -> Result + Send + Sync + 'static, @@ -365,7 +367,7 @@ impl } } -impl ExtensionTypeRegistration +impl ExtensionTypeRegistration for DefaultExtensionTypeRegistration { fn type_name(&self) -> &str { @@ -378,22 +380,11 @@ impl ExtensionTypeRegistration metadata: Option<&str>, ) -> Result { let metadata = TExtensionType::deserialize_metadata(metadata)?; - let type_instance = self.factory.as_ref()(storage_type, metadata)?; - type_instance - .supports_data_type(storage_type) - .map_err(|_| { - plan_datafusion_err!( - "Extension type {} obtained from registration does not support the storage data type {}", - TExtensionType::NAME, - storage_type - ) - })?; - - Ok(Arc::new(type_instance) as DFExtensionTypeRef) + self.factory.as_ref()(storage_type, metadata) } } -impl Debug +impl Debug for DefaultExtensionTypeRegistration { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -428,27 +419,50 @@ impl MemoryExtensionTypeRegistry { /// in the extension type registry. pub fn new_with_canonical_extension_types() -> Self { let mapping = [ - DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { - Ok(DFFixedShapeTensor::try_new(storage_type, metadata)?) - }), - DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { - Ok(DFVariableShapeTensor::try_new(storage_type, metadata)?) - }), - DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { - Ok(DFJson::try_new(storage_type, metadata)?) - }), - DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { - Ok(DFUuid::try_new(storage_type, metadata)?) - }), - DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { - Ok(DFOpaque::try_new(storage_type, metadata)?) - }), - DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { - Ok(DFBool8::try_new(storage_type, metadata)?) - }), - DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { - Ok(DFTimestampWithOffset::try_new(storage_type, metadata)?) - }), + DefaultExtensionTypeRegistration::::new_arc( + |storage_type, metadata| { + Ok(Arc::new(DFFixedShapeTensor::try_new( + storage_type, + metadata, + )?)) + }, + ), + DefaultExtensionTypeRegistration::::new_arc( + |storage_type, metadata| { + Ok(Arc::new(DFVariableShapeTensor::try_new( + storage_type, + metadata, + )?)) + }, + ), + DefaultExtensionTypeRegistration::::new_arc( + |storage_type, metadata| { + Ok(Arc::new(DFJson::try_new(storage_type, metadata)?)) + }, + ), + DefaultExtensionTypeRegistration::::new_arc( + |storage_type, metadata| { + Ok(Arc::new(DFUuid::try_new(storage_type, metadata)?)) + }, + ), + DefaultExtensionTypeRegistration::::new_arc( + |storage_type, metadata| { + Ok(Arc::new(DFOpaque::try_new(storage_type, metadata)?)) + }, + ), + DefaultExtensionTypeRegistration::::new_arc( + |storage_type, metadata| { + Ok(Arc::new(DFBool8::try_new(storage_type, metadata)?)) + }, + ), + DefaultExtensionTypeRegistration::::new_arc( + |storage_type, metadata| { + Ok(Arc::new(DFTimestampWithOffset::try_new( + storage_type, + metadata, + )?)) + }, + ), ]; let mut extension_types = HashMap::new(); From f7461a6d720c0f185d7c4f673c1b3b4e55ad095e Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 2 Apr 2026 10:53:48 +0200 Subject: [PATCH 10/13] Fix temperature example --- .../examples/extension_types/temperature.rs | 47 +++++++++++++------ 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/datafusion-examples/examples/extension_types/temperature.rs b/datafusion-examples/examples/extension_types/temperature.rs index fe497aad65ce8..50c5933c76b74 100644 --- a/datafusion-examples/examples/extension_types/temperature.rs +++ b/datafusion-examples/examples/extension_types/temperature.rs @@ -51,12 +51,14 @@ fn create_session_context() -> Result { // The registration creates a new instance of the extension type with the deserialized metadata. let temp_registration = - DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { - Ok(TemperatureExtensionType::new( - storage_type.clone(), - metadata, - )) - }); + DefaultExtensionTypeRegistration::::new_arc( + |storage_type, metadata| { + Ok(Arc::new(TemperatureExtensionType::try_new( + storage_type, + metadata, + )?)) + }, + ); registry.add_extension_type_registration(temp_registration)?; let state = SessionStateBuilder::default() @@ -97,13 +99,25 @@ fn example_schema() -> SchemaRef { Arc::new(Schema::new(vec![ Field::new("city", DataType::Utf8, false), Field::new("celsius", DataType::Float64, false).with_extension_type( - TemperatureExtensionType::new(DataType::Float64, TemperatureUnit::Celsius), + TemperatureExtensionType::try_new( + &DataType::Float64, + TemperatureUnit::Celsius, + ) + .expect("Valid Type"), ), Field::new("fahrenheit", DataType::Float64, false).with_extension_type( - TemperatureExtensionType::new(DataType::Float64, TemperatureUnit::Fahrenheit), + TemperatureExtensionType::try_new( + &DataType::Float64, + TemperatureUnit::Fahrenheit, + ) + .expect("Valid Type"), ), Field::new("kelvin", DataType::Float32, false).with_extension_type( - TemperatureExtensionType::new(DataType::Float32, TemperatureUnit::Kelvin), + TemperatureExtensionType::try_new( + &DataType::Float32, + TemperatureUnit::Kelvin, + ) + .expect("Valid Type"), ), ])) } @@ -144,11 +158,16 @@ pub struct TemperatureExtensionType { impl TemperatureExtensionType { /// Creates a new [`TemperatureExtensionType`]. - pub fn new(storage_type: DataType, temperature_unit: TemperatureUnit) -> Self { - Self { - storage_type, + pub fn try_new( + storage_type: &DataType, + temperature_unit: TemperatureUnit, + ) -> Result { + let result = Self { + storage_type: storage_type.clone(), temperature_unit, - } + }; + result.supports_data_type(&storage_type)?; // Validate the storage type + Ok(result) } } @@ -212,7 +231,7 @@ impl ExtensionType for TemperatureExtensionType { data_type: &DataType, metadata: Self::Metadata, ) -> std::result::Result { - let instance = Self::new(data_type.clone(), metadata); + let instance = Self::try_new(data_type, metadata)?; instance.supports_data_type(data_type)?; Ok(instance) } From 8e63149d5ea1eba54f5a30007dc72a41ae688a99 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 2 Apr 2026 11:19:32 +0200 Subject: [PATCH 11/13] Fix CI --- datafusion-examples/examples/extension_types/temperature.rs | 2 +- datafusion/expr/src/registry.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion-examples/examples/extension_types/temperature.rs b/datafusion-examples/examples/extension_types/temperature.rs index 50c5933c76b74..8b6c6ab8df8e5 100644 --- a/datafusion-examples/examples/extension_types/temperature.rs +++ b/datafusion-examples/examples/extension_types/temperature.rs @@ -166,7 +166,7 @@ impl TemperatureExtensionType { storage_type: storage_type.clone(), temperature_unit, }; - result.supports_data_type(&storage_type)?; // Validate the storage type + result.supports_data_type(storage_type)?; // Validate the storage type Ok(result) } } diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index fe7a2416aa98a..bb2985c7e5ae5 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -246,6 +246,8 @@ pub type ExtensionTypeRegistrationRef = Arc; /// field and create the corresponding [`DFExtensionType`] instance with the correct `n` set. /// /// The [`DefaultExtensionTypeRegistration`] provides a convenient way of creating registrations. +/// +/// [`DFExtensionType`]: datafusion_common::types::DFExtensionType pub trait ExtensionTypeRegistration: Debug + Send + Sync { /// The name of the extension type. /// From 28578aff3c841f222a09ec6d29d424b377015ac1 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 2 Apr 2026 19:46:29 +0200 Subject: [PATCH 12/13] Remove requirement to arrow-rs ExtensionType --- .../examples/extension_types/temperature.rs | 144 ++++++++---------- datafusion/expr/src/registry.rs | 85 ++++++----- 2 files changed, 109 insertions(+), 120 deletions(-) diff --git a/datafusion-examples/examples/extension_types/temperature.rs b/datafusion-examples/examples/extension_types/temperature.rs index 8b6c6ab8df8e5..c126088eea5f6 100644 --- a/datafusion-examples/examples/extension_types/temperature.rs +++ b/datafusion-examples/examples/extension_types/temperature.rs @@ -21,7 +21,7 @@ use arrow::array::{ }; use arrow::datatypes::{Float32Type, Float64Type}; use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; -use arrow_schema::extension::ExtensionType; +use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY}; use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef}; use datafusion::dataframe::DataFrame; use datafusion::error::Result; @@ -32,6 +32,7 @@ use datafusion_common::types::DFExtensionType; use datafusion_expr::registry::{ DefaultExtensionTypeRegistration, ExtensionTypeRegistry, MemoryExtensionTypeRegistry, }; +use std::collections::HashMap; use std::fmt::{Display, Write}; use std::sync::Arc; @@ -50,15 +51,15 @@ fn create_session_context() -> Result { let registry = MemoryExtensionTypeRegistry::new_empty(); // The registration creates a new instance of the extension type with the deserialized metadata. - let temp_registration = - DefaultExtensionTypeRegistration::::new_arc( - |storage_type, metadata| { - Ok(Arc::new(TemperatureExtensionType::try_new( - storage_type, - metadata, - )?)) - }, - ); + let temp_registration = DefaultExtensionTypeRegistration::new_arc( + TemperatureExtensionType::NAME, + |storage_type, metadata| { + Ok(Arc::new(TemperatureExtensionType::try_new( + storage_type, + TemperatureUnit::try_from(metadata)?, + )?)) + }, + ); registry.add_extension_type_registration(temp_registration)?; let state = SessionStateBuilder::default() @@ -98,38 +99,15 @@ async fn register_temperature_table(ctx: &SessionContext) -> Result { fn example_schema() -> SchemaRef { Arc::new(Schema::new(vec![ Field::new("city", DataType::Utf8, false), - Field::new("celsius", DataType::Float64, false).with_extension_type( - TemperatureExtensionType::try_new( - &DataType::Float64, - TemperatureUnit::Celsius, - ) - .expect("Valid Type"), - ), - Field::new("fahrenheit", DataType::Float64, false).with_extension_type( - TemperatureExtensionType::try_new( - &DataType::Float64, - TemperatureUnit::Fahrenheit, - ) - .expect("Valid Type"), - ), - Field::new("kelvin", DataType::Float32, false).with_extension_type( - TemperatureExtensionType::try_new( - &DataType::Float32, - TemperatureUnit::Kelvin, - ) - .expect("Valid Type"), - ), + Field::new("celsius", DataType::Float64, false) + .with_metadata(create_metadata(TemperatureUnit::Celsius)), + Field::new("fahrenheit", DataType::Float64, false) + .with_metadata(create_metadata(TemperatureUnit::Fahrenheit)), + Field::new("kelvin", DataType::Float32, false) + .with_metadata(create_metadata(TemperatureUnit::Kelvin)), ])) } -/// Represents the unit of a temperature reading. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum TemperatureUnit { - Celsius, - Fahrenheit, - Kelvin, -} - /// Represents a float that semantically represents a temperature. The temperature can be one of /// the supported [`TemperatureUnit`]s. /// @@ -157,51 +135,61 @@ pub struct TemperatureExtensionType { } impl TemperatureExtensionType { + /// The name of the extension type. + pub const NAME: &'static str = "custom.temperature"; + /// Creates a new [`TemperatureExtensionType`]. pub fn try_new( storage_type: &DataType, temperature_unit: TemperatureUnit, - ) -> Result { + ) -> Result { + match storage_type { + DataType::Float32 | DataType::Float64 => {} + _ => { + return Err(ArrowError::InvalidArgumentError(format!( + "Invalid data type: {storage_type} for temperature type, expected Float32 or Float64", + ))); + } + } + let result = Self { storage_type: storage_type.clone(), temperature_unit, }; - result.supports_data_type(storage_type)?; // Validate the storage type Ok(result) } } -/// Implementation of [`ExtensionType`] for [`TemperatureExtensionType`]. -/// -/// This implements the arrow-rs trait for reading, writing, and validating extension types. -impl ExtensionType for TemperatureExtensionType { - /// Arrow extension type name that is stored in the `ARROW:extension:name` field. - const NAME: &'static str = "custom.temperature"; - type Metadata = TemperatureUnit; - - fn metadata(&self) -> &Self::Metadata { - &self.temperature_unit - } +/// Represents the unit of a temperature reading. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TemperatureUnit { + Celsius, + Fahrenheit, + Kelvin, +} +impl TemperatureUnit { /// Arrow extension type metadata is encoded as a string and stored using the /// `ARROW:extension:metadata` key. As we only store the name of the unit, a simple string /// suffices. Extension types can store more complex metadata using serialization formats like /// JSON. - fn serialize_metadata(&self) -> Option { - let s = match self.temperature_unit { + pub fn serialize(self) -> String { + let result = match self { TemperatureUnit::Celsius => "celsius", TemperatureUnit::Fahrenheit => "fahrenheit", TemperatureUnit::Kelvin => "kelvin", }; - Some(s.to_string()) + result.to_owned() } +} - /// Inverse operation of [`Self::serialize_metadata`]. This creates the [`TemperatureUnit`] - /// value from the serialized string. - fn deserialize_metadata( - metadata: Option<&str>, - ) -> std::result::Result { - match metadata { +/// Inverse operation of [`TemperatureUnit::serialize`]. This creates the [`TemperatureUnit`] +/// value from the serialized string. +impl TryFrom> for TemperatureUnit { + type Error = ArrowError; + + fn try_from(value: Option<&str>) -> std::result::Result { + match value { Some("celsius") => Ok(TemperatureUnit::Celsius), Some("fahrenheit") => Ok(TemperatureUnit::Fahrenheit), Some("kelvin") => Ok(TemperatureUnit::Kelvin), @@ -213,28 +201,18 @@ impl ExtensionType for TemperatureExtensionType { )), } } +} - /// Checks that the extension type supports a given [`DataType`]. - fn supports_data_type( - &self, - data_type: &DataType, - ) -> std::result::Result<(), ArrowError> { - match data_type { - DataType::Float32 | DataType::Float64 => Ok(()), - _ => Err(ArrowError::InvalidArgumentError(format!( - "Invalid data type: {data_type} for temperature type, expected Float32 or Float64", - ))), - } - } - - fn try_new( - data_type: &DataType, - metadata: Self::Metadata, - ) -> std::result::Result { - let instance = Self::try_new(data_type, metadata)?; - instance.supports_data_type(data_type)?; - Ok(instance) - } +/// This creates a metadata map for the temperature type. Another way of writing the metadata can be +/// implemented using arrow-rs' [`ExtensionType`](arrow_schema::extension::ExtensionType) trait. +fn create_metadata(unit: TemperatureUnit) -> HashMap { + HashMap::from([ + ( + EXTENSION_TYPE_NAME_KEY.to_owned(), + TemperatureExtensionType::NAME.to_owned(), + ), + (EXTENSION_TYPE_METADATA_KEY.to_owned(), unit.serialize()), + ]) } /// Implementation of [`DFExtensionType`] for [`TemperatureExtensionType`]. @@ -246,7 +224,7 @@ impl DFExtensionType for TemperatureExtensionType { } fn serialize_metadata(&self) -> Option { - ExtensionType::serialize_metadata(self) + Some(self.temperature_unit.serialize()) } fn create_array_formatter<'fmt>( diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index bb2985c7e5ae5..f034371295aba 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -336,44 +336,39 @@ pub trait ExtensionTypeRegistry: Debug + Send + Sync { /// A factory that creates instances of extension types from a storage [`DataType`] and the /// metadata. -pub type ExtensionTypeFactory = dyn Fn( - &DataType, - ::Metadata, - ) -> Result - + Send - + Sync; +pub type ExtensionTypeFactory = + dyn Fn(&DataType, Option<&str>) -> Result + Send + Sync; /// A default implementation of [ExtensionTypeRegistration] that parses the metadata from the -/// given extension type and passes it to a constructor function. The [`ExtensionType::NAME`] is -/// used for registering the extension type. -pub struct DefaultExtensionTypeRegistration { +/// given extension type and passes it to a constructor function. +pub struct DefaultExtensionTypeRegistration { + /// The name of the extension type. + name: String, /// A function that creates an instance of [`DFExtensionTypeRef`] from the storage type and the /// metadata. - factory: Box>, + factory: Box, } -impl - DefaultExtensionTypeRegistration -{ +impl DefaultExtensionTypeRegistration { /// Creates a new registration for an extension type. The factory is required to validate that /// the storage [`DataType`] is compatible with the extension type. pub fn new_arc( - factory: impl Fn(&DataType, TExtensionType::Metadata) -> Result + name: impl Into, + factory: impl Fn(&DataType, Option<&str>) -> Result + Send + Sync + 'static, ) -> ExtensionTypeRegistrationRef { Arc::new(Self { + name: name.into(), factory: Box::new(factory), }) } } -impl ExtensionTypeRegistration - for DefaultExtensionTypeRegistration -{ +impl ExtensionTypeRegistration for DefaultExtensionTypeRegistration { fn type_name(&self) -> &str { - TExtensionType::NAME + &self.name } fn create_df_extension_type( @@ -381,17 +376,14 @@ impl ExtensionTypeRegistration storage_type: &DataType, metadata: Option<&str>, ) -> Result { - let metadata = TExtensionType::deserialize_metadata(metadata)?; self.factory.as_ref()(storage_type, metadata) } } -impl Debug - for DefaultExtensionTypeRegistration -{ +impl Debug for DefaultExtensionTypeRegistration { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("DefaultExtensionTypeRegistration") - .field("type_name", &TExtensionType::NAME) + .field("type_name", &self.name) .finish() } } @@ -421,47 +413,66 @@ impl MemoryExtensionTypeRegistry { /// in the extension type registry. pub fn new_with_canonical_extension_types() -> Self { let mapping = [ - DefaultExtensionTypeRegistration::::new_arc( + DefaultExtensionTypeRegistration::new_arc( + FixedShapeTensor::NAME, |storage_type, metadata| { Ok(Arc::new(DFFixedShapeTensor::try_new( storage_type, - metadata, + FixedShapeTensor::deserialize_metadata(metadata)?, )?)) }, ), - DefaultExtensionTypeRegistration::::new_arc( + DefaultExtensionTypeRegistration::new_arc( + VariableShapeTensor::NAME, |storage_type, metadata| { Ok(Arc::new(DFVariableShapeTensor::try_new( storage_type, - metadata, + VariableShapeTensor::deserialize_metadata(metadata)?, )?)) }, ), - DefaultExtensionTypeRegistration::::new_arc( + DefaultExtensionTypeRegistration::new_arc( + Json::NAME, |storage_type, metadata| { - Ok(Arc::new(DFJson::try_new(storage_type, metadata)?)) + Ok(Arc::new(DFJson::try_new( + storage_type, + Json::deserialize_metadata(metadata)?, + )?)) }, ), - DefaultExtensionTypeRegistration::::new_arc( + DefaultExtensionTypeRegistration::new_arc( + Uuid::NAME, |storage_type, metadata| { - Ok(Arc::new(DFUuid::try_new(storage_type, metadata)?)) + Ok(Arc::new(DFUuid::try_new( + storage_type, + Uuid::deserialize_metadata(metadata)?, + )?)) }, ), - DefaultExtensionTypeRegistration::::new_arc( + DefaultExtensionTypeRegistration::new_arc( + Opaque::NAME, |storage_type, metadata| { - Ok(Arc::new(DFOpaque::try_new(storage_type, metadata)?)) + Ok(Arc::new(DFOpaque::try_new( + storage_type, + Opaque::deserialize_metadata(metadata)?, + )?)) }, ), - DefaultExtensionTypeRegistration::::new_arc( + DefaultExtensionTypeRegistration::new_arc( + Bool8::NAME, |storage_type, metadata| { - Ok(Arc::new(DFBool8::try_new(storage_type, metadata)?)) + Ok(Arc::new(DFBool8::try_new( + storage_type, + Bool8::deserialize_metadata(metadata)?, + )?)) }, ), - DefaultExtensionTypeRegistration::::new_arc( + DefaultExtensionTypeRegistration::new_arc( + TimestampWithOffset::NAME, |storage_type, metadata| { Ok(Arc::new(DFTimestampWithOffset::try_new( storage_type, - metadata, + TimestampWithOffset::deserialize_metadata(metadata)?, )?)) }, ), From 7b713490c7061de688a5cc85c1abbb60f025a87c Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 2 Apr 2026 19:55:43 +0200 Subject: [PATCH 13/13] Make ExtensionTypeRegistrations less complicated --- .../examples/extension_types/temperature.rs | 4 +- datafusion/expr/src/registry.rs | 148 ++++++++---------- 2 files changed, 63 insertions(+), 89 deletions(-) diff --git a/datafusion-examples/examples/extension_types/temperature.rs b/datafusion-examples/examples/extension_types/temperature.rs index c126088eea5f6..bbfa123a0ad28 100644 --- a/datafusion-examples/examples/extension_types/temperature.rs +++ b/datafusion-examples/examples/extension_types/temperature.rs @@ -30,7 +30,7 @@ use datafusion::prelude::SessionContext; use datafusion_common::internal_err; use datafusion_common::types::DFExtensionType; use datafusion_expr::registry::{ - DefaultExtensionTypeRegistration, ExtensionTypeRegistry, MemoryExtensionTypeRegistry, + ExtensionTypeRegistration, ExtensionTypeRegistry, MemoryExtensionTypeRegistry, }; use std::collections::HashMap; use std::fmt::{Display, Write}; @@ -51,7 +51,7 @@ fn create_session_context() -> Result { let registry = MemoryExtensionTypeRegistry::new_empty(); // The registration creates a new instance of the extension type with the deserialized metadata. - let temp_registration = DefaultExtensionTypeRegistration::new_arc( + let temp_registration = ExtensionTypeRegistration::new_arc( TemperatureExtensionType::NAME, |storage_type, metadata| { Ok(Arc::new(TemperatureExtensionType::try_new( diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index f034371295aba..4be163a4bd8ea 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -226,44 +226,6 @@ impl FunctionRegistry for MemoryFunctionRegistry { } } -/// A cheaply cloneable pointer to an [ExtensionTypeRegistration]. -pub type ExtensionTypeRegistrationRef = Arc; - -/// The registration of an extension type. Implementations of this trait are responsible for -/// *creating* instances of [`DFExtensionType`] that represent the entire semantics of an extension -/// type. -/// -/// # Why do we need a Registration? -/// -/// A good question is why this trait is even necessary. Why not directly register the -/// [`DFExtensionType`] in a registration? -/// -/// While this works for extension types requiring no additional metadata (e.g., `arrow.uuid`), it -/// does not work for more complex extension types with metadata. For example, consider an extension -/// type `custom.shortened(n)` that aims to short the pretty-printing string to `n` characters. -/// Here, `n` is a parameter of the extension type and should be a field in the struct that -/// implements the [`DFExtensionType`]. The job of the registration is to read the metadata from the -/// field and create the corresponding [`DFExtensionType`] instance with the correct `n` set. -/// -/// The [`DefaultExtensionTypeRegistration`] provides a convenient way of creating registrations. -/// -/// [`DFExtensionType`]: datafusion_common::types::DFExtensionType -pub trait ExtensionTypeRegistration: Debug + Send + Sync { - /// The name of the extension type. - /// - /// This name will be used to find the correct [ExtensionTypeRegistration] when an extension - /// type is encountered. - fn type_name(&self) -> &str; - - /// Creates an extension type instance from the optional metadata. The name of the extension - /// type is not a parameter as it's already defined by the registration itself. - fn create_df_extension_type( - &self, - storage_type: &DataType, - metadata: Option<&str>, - ) -> Result; -} - /// A cheaply cloneable pointer to an [ExtensionTypeRegistry]. pub type ExtensionTypeRegistryRef = Arc; @@ -300,7 +262,7 @@ pub trait ExtensionTypeRegistry: Debug + Send + Sync { } /// Returns all registered [ExtensionTypeRegistration]. - fn extension_type_registrations(&self) -> Vec>; + fn extension_type_registrations(&self) -> Vec; /// Registers a new [ExtensionTypeRegistrationRef], returning any previously registered /// implementation. @@ -339,9 +301,27 @@ pub trait ExtensionTypeRegistry: Debug + Send + Sync { pub type ExtensionTypeFactory = dyn Fn(&DataType, Option<&str>) -> Result + Send + Sync; -/// A default implementation of [ExtensionTypeRegistration] that parses the metadata from the -/// given extension type and passes it to a constructor function. -pub struct DefaultExtensionTypeRegistration { +/// A cheaply cloneable pointer to an [ExtensionTypeRegistration]. +pub type ExtensionTypeRegistrationRef = Arc; + +/// The registration of an extension type. Implementations of this trait are responsible for +/// *creating* instances of [`DFExtensionType`] that represent the entire semantics of an extension +/// type. +/// +/// # Why do we need a Registration? +/// +/// A good question is why this trait is even necessary. Why not directly register the +/// [`DFExtensionType`] in a registry? +/// +/// While this works for extension types requiring no additional metadata (e.g., `arrow.uuid`), it +/// does not work for more complex extension types with metadata. For example, consider an extension +/// type `custom.shortened(n)` that aims to short the pretty-printing string to `n` characters. +/// Here, `n` is a parameter of the extension type and should be a field in the struct that +/// implements the [`DFExtensionType`]. The job of the registration is to read the metadata from the +/// field and create the corresponding [`DFExtensionType`] instance with the correct `n` set. +/// +/// [`DFExtensionType`]: datafusion_common::types::DFExtensionType +pub struct ExtensionTypeRegistration { /// The name of the extension type. name: String, /// A function that creates an instance of [`DFExtensionTypeRef`] from the storage type and the @@ -349,7 +329,7 @@ pub struct DefaultExtensionTypeRegistration { factory: Box, } -impl DefaultExtensionTypeRegistration { +impl ExtensionTypeRegistration { /// Creates a new registration for an extension type. The factory is required to validate that /// the storage [`DataType`] is compatible with the extension type. pub fn new_arc( @@ -366,12 +346,18 @@ impl DefaultExtensionTypeRegistration { } } -impl ExtensionTypeRegistration for DefaultExtensionTypeRegistration { - fn type_name(&self) -> &str { +impl ExtensionTypeRegistration { + /// The name of the extension type. + /// + /// This name will be used to find the correct [ExtensionTypeRegistration] when an extension + /// type is encountered. + pub fn type_name(&self) -> &str { &self.name } - fn create_df_extension_type( + /// Creates an extension type instance from the optional metadata. The name of the extension + /// type is not a parameter as it's already defined by the registration itself. + pub fn create_df_extension_type( &self, storage_type: &DataType, metadata: Option<&str>, @@ -380,7 +366,7 @@ impl ExtensionTypeRegistration for DefaultExtensionTypeRegistration { } } -impl Debug for DefaultExtensionTypeRegistration { +impl Debug for ExtensionTypeRegistration { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("DefaultExtensionTypeRegistration") .field("type_name", &self.name) @@ -413,7 +399,7 @@ impl MemoryExtensionTypeRegistry { /// in the extension type registry. pub fn new_with_canonical_extension_types() -> Self { let mapping = [ - DefaultExtensionTypeRegistration::new_arc( + ExtensionTypeRegistration::new_arc( FixedShapeTensor::NAME, |storage_type, metadata| { Ok(Arc::new(DFFixedShapeTensor::try_new( @@ -422,7 +408,7 @@ impl MemoryExtensionTypeRegistry { )?)) }, ), - DefaultExtensionTypeRegistration::new_arc( + ExtensionTypeRegistration::new_arc( VariableShapeTensor::NAME, |storage_type, metadata| { Ok(Arc::new(DFVariableShapeTensor::try_new( @@ -431,43 +417,31 @@ impl MemoryExtensionTypeRegistry { )?)) }, ), - DefaultExtensionTypeRegistration::new_arc( - Json::NAME, - |storage_type, metadata| { - Ok(Arc::new(DFJson::try_new( - storage_type, - Json::deserialize_metadata(metadata)?, - )?)) - }, - ), - DefaultExtensionTypeRegistration::new_arc( - Uuid::NAME, - |storage_type, metadata| { - Ok(Arc::new(DFUuid::try_new( - storage_type, - Uuid::deserialize_metadata(metadata)?, - )?)) - }, - ), - DefaultExtensionTypeRegistration::new_arc( - Opaque::NAME, - |storage_type, metadata| { - Ok(Arc::new(DFOpaque::try_new( - storage_type, - Opaque::deserialize_metadata(metadata)?, - )?)) - }, - ), - DefaultExtensionTypeRegistration::new_arc( - Bool8::NAME, - |storage_type, metadata| { - Ok(Arc::new(DFBool8::try_new( - storage_type, - Bool8::deserialize_metadata(metadata)?, - )?)) - }, - ), - DefaultExtensionTypeRegistration::new_arc( + ExtensionTypeRegistration::new_arc(Json::NAME, |storage_type, metadata| { + Ok(Arc::new(DFJson::try_new( + storage_type, + Json::deserialize_metadata(metadata)?, + )?)) + }), + ExtensionTypeRegistration::new_arc(Uuid::NAME, |storage_type, metadata| { + Ok(Arc::new(DFUuid::try_new( + storage_type, + Uuid::deserialize_metadata(metadata)?, + )?)) + }), + ExtensionTypeRegistration::new_arc(Opaque::NAME, |storage_type, metadata| { + Ok(Arc::new(DFOpaque::try_new( + storage_type, + Opaque::deserialize_metadata(metadata)?, + )?)) + }), + ExtensionTypeRegistration::new_arc(Bool8::NAME, |storage_type, metadata| { + Ok(Arc::new(DFBool8::try_new( + storage_type, + Bool8::deserialize_metadata(metadata)?, + )?)) + }), + ExtensionTypeRegistration::new_arc( TimestampWithOffset::NAME, |storage_type, metadata| { Ok(Arc::new(DFTimestampWithOffset::try_new( @@ -529,7 +503,7 @@ impl ExtensionTypeRegistry for MemoryExtensionTypeRegistry { .cloned() } - fn extension_type_registrations(&self) -> Vec> { + fn extension_type_registrations(&self) -> Vec { self.extension_types .read() .expect("Extension type registry lock poisoned")