diff --git a/datafusion-examples/examples/extension_types/temperature.rs b/datafusion-examples/examples/extension_types/temperature.rs index fe497aad65ce8..bbfa123a0ad28 100644 --- a/datafusion-examples/examples/extension_types/temperature.rs +++ b/datafusion-examples/examples/extension_types/temperature.rs @@ -21,7 +21,7 @@ use arrow::array::{ }; use arrow::datatypes::{Float32Type, Float64Type}; use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; -use arrow_schema::extension::ExtensionType; +use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY}; use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef}; use datafusion::dataframe::DataFrame; use datafusion::error::Result; @@ -30,8 +30,9 @@ use datafusion::prelude::SessionContext; use datafusion_common::internal_err; use datafusion_common::types::DFExtensionType; use datafusion_expr::registry::{ - DefaultExtensionTypeRegistration, ExtensionTypeRegistry, MemoryExtensionTypeRegistry, + ExtensionTypeRegistration, ExtensionTypeRegistry, MemoryExtensionTypeRegistry, }; +use std::collections::HashMap; use std::fmt::{Display, Write}; use std::sync::Arc; @@ -50,13 +51,15 @@ fn create_session_context() -> Result { let registry = MemoryExtensionTypeRegistry::new_empty(); // The registration creates a new instance of the extension type with the deserialized metadata. - let temp_registration = - DefaultExtensionTypeRegistration::new_arc(|storage_type, metadata| { - Ok(TemperatureExtensionType::new( - storage_type.clone(), - metadata, - )) - }); + let temp_registration = ExtensionTypeRegistration::new_arc( + TemperatureExtensionType::NAME, + |storage_type, metadata| { + Ok(Arc::new(TemperatureExtensionType::try_new( + storage_type, + TemperatureUnit::try_from(metadata)?, + )?)) + }, + ); registry.add_extension_type_registration(temp_registration)?; let state = SessionStateBuilder::default() @@ -96,26 +99,15 @@ async fn register_temperature_table(ctx: &SessionContext) -> Result { fn example_schema() -> SchemaRef { Arc::new(Schema::new(vec![ Field::new("city", DataType::Utf8, false), - Field::new("celsius", DataType::Float64, false).with_extension_type( - TemperatureExtensionType::new(DataType::Float64, TemperatureUnit::Celsius), - ), - Field::new("fahrenheit", DataType::Float64, false).with_extension_type( - TemperatureExtensionType::new(DataType::Float64, TemperatureUnit::Fahrenheit), - ), - Field::new("kelvin", DataType::Float32, false).with_extension_type( - TemperatureExtensionType::new(DataType::Float32, TemperatureUnit::Kelvin), - ), + Field::new("celsius", DataType::Float64, false) + .with_metadata(create_metadata(TemperatureUnit::Celsius)), + Field::new("fahrenheit", DataType::Float64, false) + .with_metadata(create_metadata(TemperatureUnit::Fahrenheit)), + Field::new("kelvin", DataType::Float32, false) + .with_metadata(create_metadata(TemperatureUnit::Kelvin)), ])) } -/// Represents the unit of a temperature reading. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum TemperatureUnit { - Celsius, - Fahrenheit, - Kelvin, -} - /// Represents a float that semantically represents a temperature. The temperature can be one of /// the supported [`TemperatureUnit`]s. /// @@ -143,46 +135,61 @@ pub struct TemperatureExtensionType { } impl TemperatureExtensionType { + /// The name of the extension type. + pub const NAME: &'static str = "custom.temperature"; + /// Creates a new [`TemperatureExtensionType`]. - pub fn new(storage_type: DataType, temperature_unit: TemperatureUnit) -> Self { - Self { - storage_type, - temperature_unit, + pub fn try_new( + storage_type: &DataType, + temperature_unit: TemperatureUnit, + ) -> Result { + match storage_type { + DataType::Float32 | DataType::Float64 => {} + _ => { + return Err(ArrowError::InvalidArgumentError(format!( + "Invalid data type: {storage_type} for temperature type, expected Float32 or Float64", + ))); + } } + + let result = Self { + storage_type: storage_type.clone(), + temperature_unit, + }; + Ok(result) } } -/// Implementation of [`ExtensionType`] for [`TemperatureExtensionType`]. -/// -/// This implements the arrow-rs trait for reading, writing, and validating extension types. -impl ExtensionType for TemperatureExtensionType { - /// Arrow extension type name that is stored in the `ARROW:extension:name` field. - const NAME: &'static str = "custom.temperature"; - type Metadata = TemperatureUnit; - - fn metadata(&self) -> &Self::Metadata { - &self.temperature_unit - } +/// Represents the unit of a temperature reading. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TemperatureUnit { + Celsius, + Fahrenheit, + Kelvin, +} +impl TemperatureUnit { /// Arrow extension type metadata is encoded as a string and stored using the /// `ARROW:extension:metadata` key. As we only store the name of the unit, a simple string /// suffices. Extension types can store more complex metadata using serialization formats like /// JSON. - fn serialize_metadata(&self) -> Option { - let s = match self.temperature_unit { + pub fn serialize(self) -> String { + let result = match self { TemperatureUnit::Celsius => "celsius", TemperatureUnit::Fahrenheit => "fahrenheit", TemperatureUnit::Kelvin => "kelvin", }; - Some(s.to_string()) + result.to_owned() } +} - /// Inverse operation of [`Self::serialize_metadata`]. This creates the [`TemperatureUnit`] - /// value from the serialized string. - fn deserialize_metadata( - metadata: Option<&str>, - ) -> std::result::Result { - match metadata { +/// Inverse operation of [`TemperatureUnit::serialize`]. This creates the [`TemperatureUnit`] +/// value from the serialized string. +impl TryFrom> for TemperatureUnit { + type Error = ArrowError; + + fn try_from(value: Option<&str>) -> std::result::Result { + match value { Some("celsius") => Ok(TemperatureUnit::Celsius), Some("fahrenheit") => Ok(TemperatureUnit::Fahrenheit), Some("kelvin") => Ok(TemperatureUnit::Kelvin), @@ -194,28 +201,18 @@ impl ExtensionType for TemperatureExtensionType { )), } } +} - /// Checks that the extension type supports a given [`DataType`]. - fn supports_data_type( - &self, - data_type: &DataType, - ) -> std::result::Result<(), ArrowError> { - match data_type { - DataType::Float32 | DataType::Float64 => Ok(()), - _ => Err(ArrowError::InvalidArgumentError(format!( - "Invalid data type: {data_type} for temperature type, expected Float32 or Float64", - ))), - } - } - - fn try_new( - data_type: &DataType, - metadata: Self::Metadata, - ) -> std::result::Result { - let instance = Self::new(data_type.clone(), metadata); - instance.supports_data_type(data_type)?; - Ok(instance) - } +/// This creates a metadata map for the temperature type. Another way of writing the metadata can be +/// implemented using arrow-rs' [`ExtensionType`](arrow_schema::extension::ExtensionType) trait. +fn create_metadata(unit: TemperatureUnit) -> HashMap { + HashMap::from([ + ( + EXTENSION_TYPE_NAME_KEY.to_owned(), + TemperatureExtensionType::NAME.to_owned(), + ), + (EXTENSION_TYPE_METADATA_KEY.to_owned(), unit.serialize()), + ]) } /// Implementation of [`DFExtensionType`] for [`TemperatureExtensionType`]. @@ -227,7 +224,7 @@ impl DFExtensionType for TemperatureExtensionType { } fn serialize_metadata(&self) -> Option { - ExtensionType::serialize_metadata(self) + Some(self.temperature_unit.serialize()) } fn create_array_formatter<'fmt>( diff --git a/datafusion/common/src/types/canonical_extensions/bool8.rs b/datafusion/common/src/types/canonical_extensions/bool8.rs new file mode 100644 index 0000000000000..9988bf01543be --- /dev/null +++ b/datafusion/common/src/types/canonical_extensions/bool8.rs @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Result; +use crate::error::_internal_err; +use crate::types::extension::DFExtensionType; +use arrow::array::{Array, Int8Array}; +use arrow::datatypes::DataType; +use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; +use arrow_schema::extension::{Bool8, ExtensionType}; +use std::fmt::Write; + +/// Defines the extension type logic for the canonical `arrow.bool8` extension type. +/// +/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. +#[derive(Debug, Clone)] +pub struct DFBool8(Bool8); + +impl DFBool8 { + /// Creates a new [`DFBool8`], validating that the storage type is compatible with the + /// extension type. + pub fn try_new( + data_type: &DataType, + metadata: ::Metadata, + ) -> Result { + Ok(Self(::try_new( + data_type, metadata, + )?)) + } +} + +impl DFExtensionType for DFBool8 { + fn storage_type(&self) -> DataType { + DataType::Int8 + } + + fn serialize_metadata(&self) -> Option { + self.0.serialize_metadata() + } + + fn create_array_formatter<'fmt>( + &self, + array: &'fmt dyn Array, + options: &FormatOptions<'fmt>, + ) -> Result>> { + if array.data_type() != &DataType::Int8 { + return _internal_err!("Wrong array type for Bool8"); + } + + let display_index = Bool8ValueDisplayIndex { + array: array.as_any().downcast_ref().unwrap(), + null_str: options.null(), + }; + Ok(Some(ArrayFormatter::new( + Box::new(display_index), + options.safe(), + ))) + } +} + +/// Pretty printer for binary UUID values. +#[derive(Debug, Clone, Copy)] +struct Bool8ValueDisplayIndex<'a> { + array: &'a Int8Array, + null_str: &'a str, +} + +impl DisplayIndex for Bool8ValueDisplayIndex<'_> { + fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult { + if self.array.is_null(idx) { + write!(f, "{}", self.null_str)?; + return Ok(()); + } + + let bytes = self.array.value(idx); + write!(f, "{}", bytes != 0)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + pub fn test_pretty_bool8() { + let values = Int8Array::from_iter([Some(0), Some(1), Some(-20), None]); + + let extension_type = DFBool8(Bool8 {}); + let formatter = extension_type + .create_array_formatter(&values, &FormatOptions::default().with_null("NULL")) + .unwrap() + .unwrap(); + + assert_eq!(formatter.value(0).to_string(), "false"); + assert_eq!(formatter.value(1).to_string(), "true"); + assert_eq!(formatter.value(2).to_string(), "true"); + assert_eq!(formatter.value(3).to_string(), "NULL"); + } +} diff --git a/datafusion/common/src/types/canonical_extensions/fixed_shape_tensor.rs b/datafusion/common/src/types/canonical_extensions/fixed_shape_tensor.rs new file mode 100644 index 0000000000000..7f9789d1a2a30 --- /dev/null +++ b/datafusion/common/src/types/canonical_extensions/fixed_shape_tensor.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Result; +use crate::types::extension::DFExtensionType; +use arrow::datatypes::DataType; +use arrow_schema::extension::{ExtensionType, FixedShapeTensor}; + +/// Defines the extension type logic for the canonical `arrow.fixed_shape_tensor` extension type. +/// +/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. +#[derive(Debug, Clone)] +pub struct DFFixedShapeTensor { + inner: FixedShapeTensor, + /// The storage type of the tensor. + /// + /// While we could reconstruct the storage type from the inner [`FixedShapeTensor`], we may + /// choose a different name for the field within the [`DataType::FixedSizeList`] which can + /// cause problems down the line (e.g., checking for equality). + storage_type: DataType, +} + +impl DFFixedShapeTensor { + /// Creates a new [`DFFixedShapeTensor`], validating that the storage type is compatible with + /// the extension type. + pub fn try_new( + data_type: &DataType, + metadata: ::Metadata, + ) -> Result { + Ok(Self { + inner: ::try_new(data_type, metadata)?, + storage_type: data_type.clone(), + }) + } +} + +impl DFExtensionType for DFFixedShapeTensor { + fn storage_type(&self) -> DataType { + self.storage_type.clone() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } +} diff --git a/datafusion/common/src/types/canonical_extensions/json.rs b/datafusion/common/src/types/canonical_extensions/json.rs new file mode 100644 index 0000000000000..381edaa66708b --- /dev/null +++ b/datafusion/common/src/types/canonical_extensions/json.rs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Result; +use crate::types::extension::DFExtensionType; +use arrow::datatypes::DataType; +use arrow_schema::extension::{ExtensionType, Json}; + +/// Defines the extension type logic for the canonical `arrow.json` extension type. +/// +/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. +#[derive(Debug, Clone)] +pub struct DFJson { + inner: Json, + storage_type: DataType, +} + +impl DFJson { + /// Creates a new [`DFJson`], validating that the storage type is compatible with the + /// extension type. + pub fn try_new( + data_type: &DataType, + metadata: ::Metadata, + ) -> Result { + Ok(Self { + inner: ::try_new(data_type, metadata)?, + storage_type: data_type.clone(), + }) + } +} + +impl DFExtensionType for DFJson { + fn storage_type(&self) -> DataType { + self.storage_type.clone() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } +} diff --git a/datafusion/common/src/types/canonical_extensions/mod.rs b/datafusion/common/src/types/canonical_extensions/mod.rs index e61c415b44811..2d74d0669d213 100644 --- a/datafusion/common/src/types/canonical_extensions/mod.rs +++ b/datafusion/common/src/types/canonical_extensions/mod.rs @@ -15,4 +15,18 @@ // specific language governing permissions and limitations // under the License. +mod bool8; +mod fixed_shape_tensor; +mod json; +mod opaque; +mod timestamp_with_offset; mod uuid; +mod variable_shape_tensor; + +pub use bool8::DFBool8; +pub use fixed_shape_tensor::DFFixedShapeTensor; +pub use json::DFJson; +pub use opaque::DFOpaque; +pub use timestamp_with_offset::DFTimestampWithOffset; +pub use uuid::DFUuid; +pub use variable_shape_tensor::DFVariableShapeTensor; diff --git a/datafusion/common/src/types/canonical_extensions/opaque.rs b/datafusion/common/src/types/canonical_extensions/opaque.rs new file mode 100644 index 0000000000000..fd41413419d1b --- /dev/null +++ b/datafusion/common/src/types/canonical_extensions/opaque.rs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Result; +use crate::types::extension::DFExtensionType; +use arrow::datatypes::DataType; +use arrow_schema::extension::{ExtensionType, Opaque}; + +/// Defines the extension type logic for the canonical `arrow.opaque` extension type. +/// +/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. +#[derive(Debug, Clone)] +pub struct DFOpaque { + inner: Opaque, + storage_type: DataType, +} + +impl DFOpaque { + /// Creates a new [`DFOpaque`], validating that the storage type is compatible with the + /// extension type. + pub fn try_new( + data_type: &DataType, + metadata: ::Metadata, + ) -> Result { + Ok(Self { + inner: ::try_new(data_type, metadata)?, + storage_type: data_type.clone(), + }) + } +} + +impl DFExtensionType for DFOpaque { + fn storage_type(&self) -> DataType { + self.storage_type.clone() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } +} diff --git a/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs new file mode 100644 index 0000000000000..980741bb798cf --- /dev/null +++ b/datafusion/common/src/types/canonical_extensions/timestamp_with_offset.rs @@ -0,0 +1,300 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Result; +use crate::ScalarValue; +use crate::error::_internal_err; +use crate::types::extension::DFExtensionType; +use arrow::array::{Array, AsArray, Int16Array}; +use arrow::buffer::NullBuffer; +use arrow::compute::cast; +use arrow::datatypes::{ + DataType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, + TimestampNanosecondType, TimestampSecondType, +}; +use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; +use arrow_schema::ArrowError; +use arrow_schema::extension::{ExtensionType, TimestampWithOffset}; +use std::fmt::Write; + +/// Defines the extension type logic for the canonical `arrow.timestamp_with_offset` extension type. +/// +/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. +#[derive(Debug, Clone)] +pub struct DFTimestampWithOffset { + inner: TimestampWithOffset, + storage_type: DataType, +} + +impl DFTimestampWithOffset { + /// Creates a new [`DFTimestampWithOffset`], validating that the storage type is compatible with + /// the extension type. + pub fn try_new( + data_type: &DataType, + metadata: ::Metadata, + ) -> Result { + Ok(Self { + inner: ::try_new(data_type, metadata)?, + storage_type: data_type.clone(), + }) + } +} + +impl DFExtensionType for DFTimestampWithOffset { + fn storage_type(&self) -> DataType { + self.storage_type.clone() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } + + fn create_array_formatter<'fmt>( + &self, + array: &'fmt dyn Array, + options: &FormatOptions<'fmt>, + ) -> Result>> { + if array.data_type() != &self.storage_type { + return _internal_err!( + "Unexpected data type for TimestampWithOffset: {}", + array.data_type() + ); + } + + let struct_array = array.as_struct(); + let timestamp_array = struct_array + .column_by_name("timestamp") + .expect("Type checked above") + .as_ref(); + let raw_offset_array = struct_array + .column_by_name("offset_minutes") + .expect("Type checked above"); + + // Get a regular [`Int16Array`], if the offset array is a dictionary or run-length encoded. + let offset_array = cast(&raw_offset_array, &DataType::Int16)? + .as_primitive() + .clone(); + + let display_index = TimestampWithOffsetDisplayIndex { + null_buffer: struct_array.nulls(), + timestamp_array, + offset_array, + options: options.clone(), + }; + + Ok(Some(ArrayFormatter::new( + Box::new(display_index), + options.safe(), + ))) + } +} + +struct TimestampWithOffsetDisplayIndex<'a> { + /// The inner arrays are always non-null. Use the null buffer of the struct array to check + /// whether an element is null. + null_buffer: Option<&'a NullBuffer>, + timestamp_array: &'a dyn Array, + offset_array: Int16Array, + options: FormatOptions<'a>, +} + +impl DisplayIndex for TimestampWithOffsetDisplayIndex<'_> { + fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult { + if self.null_buffer.map(|nb| nb.is_null(idx)).unwrap_or(false) { + write!(f, "{}", self.options.null())?; + return Ok(()); + } + + let offset_minutes = self.offset_array.value(idx); + let offset = format_offset(offset_minutes); + + // The timestamp array must be UTC, so we can ignore the timezone. + let scalar = match self.timestamp_array.data_type() { + DataType::Timestamp(TimeUnit::Second, _) => { + let ts = self + .timestamp_array + .as_primitive::() + .value(idx); + ScalarValue::TimestampSecond(Some(ts), Some(offset.into())) + } + DataType::Timestamp(TimeUnit::Millisecond, _) => { + let ts = self + .timestamp_array + .as_primitive::() + .value(idx); + ScalarValue::TimestampMillisecond(Some(ts), Some(offset.into())) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + let ts = self + .timestamp_array + .as_primitive::() + .value(idx); + ScalarValue::TimestampMicrosecond(Some(ts), Some(offset.into())) + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + let ts = self + .timestamp_array + .as_primitive::() + .value(idx); + ScalarValue::TimestampNanosecond(Some(ts), Some(offset.into())) + } + _ => unreachable!("TimestampWithOffset storage must be a Timestamp array"), + }; + + let array = scalar.to_array().map_err(|_| { + ArrowError::ComputeError("Failed to convert scalar to array".to_owned()) + })?; + let formatter = ArrayFormatter::try_new(&array, &self.options)?; + formatter.value(0).write(f)?; + + Ok(()) + } +} + +/// Formats the offset in the format `+/-HH:MM`, which can be used as an offset in the regular +/// timestamp types. +fn format_offset(minutes: i16) -> String { + let sign = if minutes >= 0 { '+' } else { '-' }; + let minutes = minutes.abs(); + let hours = minutes / 60; + let minutes = minutes % 60; + format!("{sign}{hours:02}:{minutes:02}") +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{ + Array, DictionaryArray, Int16Array, Int32Array, RunArray, StructArray, + TimestampSecondArray, + }; + use arrow::buffer::NullBuffer; + use arrow::datatypes::{Field, Fields, Int16Type, Int32Type}; + use chrono::{TimeZone, Utc}; + use std::sync::Arc; + + #[test] + fn test_pretty_print_timestamp_with_offset() -> Result<(), ArrowError> { + let ts = Utc + .with_ymd_and_hms(2024, 4, 1, 0, 0, 0) + .unwrap() + .timestamp(); + + let offset_array = Arc::new(Int16Array::from(vec![60, -105, 0])); + + run_formatting_test( + vec![ts, ts, ts], + offset_array, + Some(NullBuffer::from(vec![true, true, false])), + FormatOptions::default().with_null("NULL"), + &[ + "2024-04-01T01:00:00+01:00", + "2024-03-31T22:15:00-01:45", + "NULL", + ], + ) + } + + #[test] + fn test_pretty_print_dictionary_offset() -> Result<(), ArrowError> { + let ts = Utc + .with_ymd_and_hms(2024, 4, 1, 12, 0, 0) + .unwrap() + .timestamp(); + + let offset_array = Arc::new(DictionaryArray::::new( + Int16Array::from(vec![0, 1, 0]), + Arc::new(Int16Array::from(vec![60, -60])), + )); + + run_formatting_test( + vec![ts, ts, ts], + offset_array, + None, + FormatOptions::default(), + &[ + "2024-04-01T13:00:00+01:00", + "2024-04-01T11:00:00-01:00", + "2024-04-01T13:00:00+01:00", + ], + ) + } + + #[test] + fn test_pretty_print_rle_offset() -> Result<(), ArrowError> { + let ts = Utc + .with_ymd_and_hms(2024, 4, 1, 12, 0, 0) + .unwrap() + .timestamp(); + + let run_ends = Int32Array::from(vec![2]); + let values = Int16Array::from(vec![120]); + let offset_array = Arc::new(RunArray::::try_new(&run_ends, &values)?); + + run_formatting_test( + vec![ts, ts], + offset_array, + None, + FormatOptions::default(), + &["2024-04-01T14:00:00+02:00", "2024-04-01T14:00:00+02:00"], + ) + } + + /// Create valid fields with flexible offset types + fn create_fields_custom_offset(time_unit: TimeUnit, offset_type: DataType) -> Fields { + let ts_field = Field::new( + "timestamp", + DataType::Timestamp(time_unit, Some("UTC".into())), + false, + ); + let offset_field = Field::new("offset_minutes", offset_type, false); + Fields::from(vec![ts_field, offset_field]) + } + + /// Helper to construct the arrays, run the formatter, and assert the expected strings. + fn run_formatting_test( + timestamps: Vec, + offset_array: Arc, + null_buffer: Option, + options: FormatOptions, + expected: &[&str], + ) -> Result<(), ArrowError> { + let fields = create_fields_custom_offset( + TimeUnit::Second, + offset_array.data_type().clone(), + ); + + let struct_array = StructArray::try_new( + fields, + vec![ + Arc::new(TimestampSecondArray::from(timestamps).with_timezone("UTC")), + offset_array, + ], + null_buffer, + )?; + + let formatter = DFTimestampWithOffset::try_new(struct_array.data_type(), ())? + .create_array_formatter(&struct_array, &options)? + .unwrap(); + + for (i, expected_str) in expected.iter().enumerate() { + assert_eq!(formatter.value(i).to_string(), *expected_str); + } + + Ok(()) + } +} diff --git a/datafusion/common/src/types/canonical_extensions/uuid.rs b/datafusion/common/src/types/canonical_extensions/uuid.rs index a4011e5827f8b..c2b7a522345fa 100644 --- a/datafusion/common/src/types/canonical_extensions/uuid.rs +++ b/datafusion/common/src/types/canonical_extensions/uuid.rs @@ -15,31 +15,47 @@ // specific language governing permissions and limitations // under the License. +use crate::Result; use crate::error::_internal_err; use crate::types::extension::DFExtensionType; use arrow::array::{Array, FixedSizeBinaryArray}; use arrow::datatypes::DataType; use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; +use arrow_schema::extension::{ExtensionType, Uuid}; use std::fmt::Write; -use uuid::{Bytes, Uuid}; +use uuid::Bytes; /// Defines the extension type logic for the canonical `arrow.uuid` extension type. /// /// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. -impl DFExtensionType for arrow_schema::extension::Uuid { +#[derive(Debug, Clone)] +pub struct DFUuid(Uuid); + +impl DFUuid { + /// Creates a new [`DFUuid`], validating that the storage type is compatible with the + /// extension type. + pub fn try_new( + data_type: &DataType, + metadata: ::Metadata, + ) -> Result { + Ok(Self(::try_new(data_type, metadata)?)) + } +} + +impl DFExtensionType for DFUuid { fn storage_type(&self) -> DataType { DataType::FixedSizeBinary(16) } fn serialize_metadata(&self) -> Option { - None + self.0.serialize_metadata() } fn create_array_formatter<'fmt>( &self, array: &'fmt dyn Array, options: &FormatOptions<'fmt>, - ) -> crate::Result>> { + ) -> Result>> { if array.data_type() != &DataType::FixedSizeBinary(16) { return _internal_err!("Wrong array type for Uuid"); } @@ -71,7 +87,7 @@ impl DisplayIndex for UuidValueDisplayIndex<'_> { let bytes = Bytes::try_from(self.array.value(idx)) .expect("FixedSizeBinaryArray length checked in create_array_formatter"); - let uuid = Uuid::from_bytes(bytes); + let uuid = uuid::Uuid::from_bytes(bytes); write!(f, "{uuid}")?; Ok(()) } @@ -81,23 +97,23 @@ impl DisplayIndex for UuidValueDisplayIndex<'_> { mod tests { use super::*; use crate::ScalarValue; + use arrow_schema::ArrowError; #[test] - pub fn test_pretty_print_uuid() { - let my_uuid = Uuid::nil(); + pub fn test_pretty_print_uuid() -> Result<(), ArrowError> { + let my_uuid = uuid::Uuid::nil(); let uuid = ScalarValue::FixedSizeBinary(16, Some(my_uuid.as_bytes().to_vec())) - .to_array_of_size(1) - .unwrap(); + .to_array_of_size(1)?; - let extension_type = arrow_schema::extension::Uuid {}; - let formatter = extension_type - .create_array_formatter(uuid.as_ref(), &FormatOptions::default()) - .unwrap() + let formatter = DFUuid::try_new(uuid.data_type(), ())? + .create_array_formatter(uuid.as_ref(), &FormatOptions::default())? .unwrap(); assert_eq!( formatter.value(0).to_string(), "00000000-0000-0000-0000-000000000000" ); + + Ok(()) } } diff --git a/datafusion/common/src/types/canonical_extensions/variable_shape_tensor.rs b/datafusion/common/src/types/canonical_extensions/variable_shape_tensor.rs new file mode 100644 index 0000000000000..19efc4ae38522 --- /dev/null +++ b/datafusion/common/src/types/canonical_extensions/variable_shape_tensor.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Result; +use crate::types::extension::DFExtensionType; +use arrow::datatypes::DataType; +use arrow_schema::extension::{ExtensionType, VariableShapeTensor}; + +/// Defines the extension type logic for the canonical `arrow.variable_shape_tensor` extension type. +/// +/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. +#[derive(Debug, Clone)] +pub struct DFVariableShapeTensor { + inner: VariableShapeTensor, + /// While we could reconstruct the storage type from the inner [`VariableShapeTensor`], we may + /// choose a different name for the field within the [`DataType::List`] which can cause problems + /// down the line (e.g., checking for equality). + storage_type: DataType, +} + +impl DFVariableShapeTensor { + /// Creates a new [`DFVariableShapeTensor`], validating that the storage type is compatible with + /// the extension type. + pub fn try_new( + data_type: &DataType, + metadata: ::Metadata, + ) -> Result { + Ok(Self { + inner: ::try_new(data_type, metadata)?, + storage_type: data_type.clone(), + }) + } +} + +impl DFExtensionType for DFVariableShapeTensor { + fn storage_type(&self) -> DataType { + self.storage_type.clone() + } + + fn serialize_metadata(&self) -> Option { + self.inner.serialize_metadata() + } +} diff --git a/datafusion/common/src/types/extension.rs b/datafusion/common/src/types/extension.rs index c05f8f19f18a9..3bcb533dbf9e6 100644 --- a/datafusion/common/src/types/extension.rs +++ b/datafusion/common/src/types/extension.rs @@ -55,6 +55,16 @@ pub type DFExtensionTypeRef = Arc; /// Furthermore, the current trait in arrow-rs is not dyn-compatible, which we need for implementing /// extension type registries. In the future, the two implementations may increasingly converge. /// +/// Another difference is that [`DFExtensionType`] represents a fully resolved extension type that +/// has a fixed storage type (i.e., [`DataType`]). This is different from arrow-rs, which only +/// stores the extension type's metadata. For example, an instance of DataFusion's JSON extension +/// type fixes one of the three possible storage types: [`DataType::Utf8`], +/// [`DataType::LargeUtf8`], or [`DataType::Utf8View`]. This fixed storaga type is returned in +/// [`DFExtensionType::storage_type`]. This is not possible in arrow-rs' extension type instances. +/// This is the reason why we have different types in DataFusion that usually delegate the metadata +/// processing to the underlying arrow-rs extension type instance +/// (e.g., [`DFJson`](crate::types::DFJson) instead of [`Json`](arrow_schema::extension::Json)). +/// /// # Examples /// /// Examples for using the extension type machinery can be found in the DataFusion examples diff --git a/datafusion/common/src/types/mod.rs b/datafusion/common/src/types/mod.rs index 82455063bc6ce..57bf921a6d564 100644 --- a/datafusion/common/src/types/mod.rs +++ b/datafusion/common/src/types/mod.rs @@ -23,6 +23,7 @@ mod logical; mod native; pub use builtin::*; +pub use canonical_extensions::*; pub use extension::*; pub use field::*; pub use logical::*; diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index 52a29019bc96b..4be163a4bd8ea 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -22,8 +22,14 @@ use crate::planner::ExprPlanner; use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF}; use arrow::datatypes::Field; use arrow_schema::DataType; -use arrow_schema::extension::ExtensionType; -use datafusion_common::types::{DFExtensionType, DFExtensionTypeRef}; +use arrow_schema::extension::{ + Bool8, ExtensionType, FixedShapeTensor, Json, Opaque, TimestampWithOffset, Uuid, + VariableShapeTensor, +}; +use datafusion_common::types::{ + DFBool8, DFExtensionTypeRef, DFFixedShapeTensor, DFJson, DFOpaque, + DFTimestampWithOffset, DFUuid, DFVariableShapeTensor, +}; use datafusion_common::{HashMap, Result, not_impl_err, plan_datafusion_err}; use std::collections::HashSet; use std::fmt::{Debug, Formatter}; @@ -220,42 +226,6 @@ impl FunctionRegistry for MemoryFunctionRegistry { } } -/// A cheaply cloneable pointer to an [ExtensionTypeRegistration]. -pub type ExtensionTypeRegistrationRef = Arc; - -/// The registration of an extension type. Implementations of this trait are responsible for -/// *creating* instances of [`DFExtensionType`] that represent the entire semantics of an extension -/// type. -/// -/// # Why do we need a Registration? -/// -/// A good question is why this trait is even necessary. Why not directly register the -/// [`DFExtensionType`] in a registration? -/// -/// While this works for extension types requiring no additional metadata (e.g., `arrow.uuid`), it -/// does not work for more complex extension types with metadata. For example, consider an extension -/// type `custom.shortened(n)` that aims to short the pretty-printing string to `n` characters. -/// Here, `n` is a parameter of the extension type and should be a field in the struct that -/// implements the [`DFExtensionType`]. The job of the registration is to read the metadata from the -/// field and create the corresponding [`DFExtensionType`] instance with the correct `n` set. -/// -/// The [`DefaultExtensionTypeRegistration`] provides a convenient way of creating registrations. -pub trait ExtensionTypeRegistration: Debug + Send + Sync { - /// The name of the extension type. - /// - /// This name will be used to find the correct [ExtensionTypeRegistration] when an extension - /// type is encountered. - fn type_name(&self) -> &str; - - /// Creates an extension type instance from the optional metadata. The name of the extension - /// type is not a parameter as it's already defined by the registration itself. - fn create_df_extension_type( - &self, - storage_type: &DataType, - metadata: Option<&str>, - ) -> Result; -} - /// A cheaply cloneable pointer to an [ExtensionTypeRegistry]. pub type ExtensionTypeRegistryRef = Arc; @@ -292,7 +262,7 @@ pub trait ExtensionTypeRegistry: Debug + Send + Sync { } /// Returns all registered [ExtensionTypeRegistration]. - fn extension_type_registrations(&self) -> Vec>; + fn extension_type_registrations(&self) -> Vec; /// Registers a new [ExtensionTypeRegistrationRef], returning any previously registered /// implementation. @@ -328,74 +298,78 @@ pub trait ExtensionTypeRegistry: Debug + Send + Sync { /// A factory that creates instances of extension types from a storage [`DataType`] and the /// metadata. -pub type ExtensionTypeFactory = dyn Fn(&DataType, ::Metadata) -> Result - + Send - + Sync; - -/// A default implementation of [ExtensionTypeRegistration] that parses the metadata from the -/// given extension type and passes it to a constructor function. -pub struct DefaultExtensionTypeRegistration< - TExtensionType: ExtensionType + DFExtensionType + 'static, -> { +pub type ExtensionTypeFactory = + dyn Fn(&DataType, Option<&str>) -> Result + Send + Sync; + +/// A cheaply cloneable pointer to an [ExtensionTypeRegistration]. +pub type ExtensionTypeRegistrationRef = Arc; + +/// The registration of an extension type. Implementations of this trait are responsible for +/// *creating* instances of [`DFExtensionType`] that represent the entire semantics of an extension +/// type. +/// +/// # Why do we need a Registration? +/// +/// A good question is why this trait is even necessary. Why not directly register the +/// [`DFExtensionType`] in a registry? +/// +/// While this works for extension types requiring no additional metadata (e.g., `arrow.uuid`), it +/// does not work for more complex extension types with metadata. For example, consider an extension +/// type `custom.shortened(n)` that aims to short the pretty-printing string to `n` characters. +/// Here, `n` is a parameter of the extension type and should be a field in the struct that +/// implements the [`DFExtensionType`]. The job of the registration is to read the metadata from the +/// field and create the corresponding [`DFExtensionType`] instance with the correct `n` set. +/// +/// [`DFExtensionType`]: datafusion_common::types::DFExtensionType +pub struct ExtensionTypeRegistration { + /// The name of the extension type. + name: String, /// A function that creates an instance of [`DFExtensionTypeRef`] from the storage type and the /// metadata. - factory: Box>, + factory: Box, } -impl - DefaultExtensionTypeRegistration -{ - /// Creates a new registration for an extension type. - /// - /// The factory is not required to validate the storage [`DataType`], as the compatibility will - /// be checked by the registration using [`ExtensionType::supports_data_type`]. However, the - /// factory may still choose to do so. +impl ExtensionTypeRegistration { + /// Creates a new registration for an extension type. The factory is required to validate that + /// the storage [`DataType`] is compatible with the extension type. pub fn new_arc( - factory: impl Fn(&DataType, TExtensionType::Metadata) -> Result + name: impl Into, + factory: impl Fn(&DataType, Option<&str>) -> Result + Send + Sync + 'static, ) -> ExtensionTypeRegistrationRef { Arc::new(Self { + name: name.into(), factory: Box::new(factory), }) } } -impl ExtensionTypeRegistration - for DefaultExtensionTypeRegistration -{ - fn type_name(&self) -> &str { - TExtensionType::NAME +impl ExtensionTypeRegistration { + /// The name of the extension type. + /// + /// This name will be used to find the correct [ExtensionTypeRegistration] when an extension + /// type is encountered. + pub fn type_name(&self) -> &str { + &self.name } - fn create_df_extension_type( + /// Creates an extension type instance from the optional metadata. The name of the extension + /// type is not a parameter as it's already defined by the registration itself. + pub fn create_df_extension_type( &self, storage_type: &DataType, metadata: Option<&str>, ) -> Result { - let metadata = TExtensionType::deserialize_metadata(metadata)?; - let type_instance = self.factory.as_ref()(storage_type, metadata)?; - type_instance - .supports_data_type(storage_type) - .map_err(|_| { - plan_datafusion_err!( - "Extension type {} obtained from registration does not support the storage data type {}", - TExtensionType::NAME, - storage_type - ) - })?; - - Ok(Arc::new(type_instance) as DFExtensionTypeRef) + self.factory.as_ref()(storage_type, metadata) } } -impl Debug - for DefaultExtensionTypeRegistration -{ +impl Debug for ExtensionTypeRegistration { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("DefaultExtensionTypeRegistration") - .field("type_name", &TExtensionType::NAME) + .field("type_name", &self.name) .finish() } } @@ -424,9 +398,59 @@ impl MemoryExtensionTypeRegistry { /// Pre-registers the [canonical extension types](https://arrow.apache.org/docs/format/CanonicalExtensions.html) /// in the extension type registry. pub fn new_with_canonical_extension_types() -> Self { - let mapping = [DefaultExtensionTypeRegistration::new_arc(|_, _| { - Ok(arrow_schema::extension::Uuid {}) - })]; + let mapping = [ + ExtensionTypeRegistration::new_arc( + FixedShapeTensor::NAME, + |storage_type, metadata| { + Ok(Arc::new(DFFixedShapeTensor::try_new( + storage_type, + FixedShapeTensor::deserialize_metadata(metadata)?, + )?)) + }, + ), + ExtensionTypeRegistration::new_arc( + VariableShapeTensor::NAME, + |storage_type, metadata| { + Ok(Arc::new(DFVariableShapeTensor::try_new( + storage_type, + VariableShapeTensor::deserialize_metadata(metadata)?, + )?)) + }, + ), + ExtensionTypeRegistration::new_arc(Json::NAME, |storage_type, metadata| { + Ok(Arc::new(DFJson::try_new( + storage_type, + Json::deserialize_metadata(metadata)?, + )?)) + }), + ExtensionTypeRegistration::new_arc(Uuid::NAME, |storage_type, metadata| { + Ok(Arc::new(DFUuid::try_new( + storage_type, + Uuid::deserialize_metadata(metadata)?, + )?)) + }), + ExtensionTypeRegistration::new_arc(Opaque::NAME, |storage_type, metadata| { + Ok(Arc::new(DFOpaque::try_new( + storage_type, + Opaque::deserialize_metadata(metadata)?, + )?)) + }), + ExtensionTypeRegistration::new_arc(Bool8::NAME, |storage_type, metadata| { + Ok(Arc::new(DFBool8::try_new( + storage_type, + Bool8::deserialize_metadata(metadata)?, + )?)) + }), + ExtensionTypeRegistration::new_arc( + TimestampWithOffset::NAME, + |storage_type, metadata| { + Ok(Arc::new(DFTimestampWithOffset::try_new( + storage_type, + TimestampWithOffset::deserialize_metadata(metadata)?, + )?)) + }, + ), + ]; let mut extension_types = HashMap::new(); for registration in mapping.into_iter() { @@ -479,7 +503,7 @@ impl ExtensionTypeRegistry for MemoryExtensionTypeRegistry { .cloned() } - fn extension_type_registrations(&self) -> Vec> { + fn extension_type_registrations(&self) -> Vec { self.extension_types .read() .expect("Extension type registry lock poisoned")