From 3d3283a12784e0727d8e15fe290b9d544ddd08e2 Mon Sep 17 00:00:00 2001 From: Martin Sahlen Date: Sun, 17 May 2026 16:27:41 +0300 Subject: [PATCH] feat: UNNEST with ordinality and offset support --- datafusion/common/src/lib.rs | 2 +- datafusion/common/src/unnest.rs | 43 ++++ datafusion/expr/src/logical_plan/plan.rs | 13 ++ datafusion/physical-plan/src/unnest.rs | 79 ++++++- datafusion/proto/proto/datafusion.proto | 16 ++ datafusion/proto/src/generated/pbjson.rs | 198 ++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 40 ++++ .../proto/src/logical_plan/from_proto.rs | 19 +- datafusion/proto/src/logical_plan/to_proto.rs | 18 +- .../tests/cases/roundtrip_logical_plan.rs | 22 ++ datafusion/sql/src/relation/mod.rs | 42 ++-- datafusion/sql/src/select.rs | 23 +- datafusion/sqllogictest/test_files/unnest.slt | 42 +++- 13 files changed, 531 insertions(+), 26 deletions(-) diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 06e567cb12672..2443c7420feb6 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -95,7 +95,7 @@ pub use schema_reference::SchemaReference; pub use spans::{Location, Span, Spans}; pub use stats::{ColumnStatistics, Statistics}; pub use table_reference::{ResolvedTableReference, TableReference}; -pub use unnest::{RecursionUnnestOption, UnnestOptions}; +pub use unnest::{IndexBase, PositionColumn, RecursionUnnestOption, UnnestOptions}; pub use utils::project_schema; // These are hidden from docs purely to avoid polluting the public view of what this crate exports. diff --git a/datafusion/common/src/unnest.rs b/datafusion/common/src/unnest.rs index db48edd061605..81c6d5e96b4bc 100644 --- a/datafusion/common/src/unnest.rs +++ b/datafusion/common/src/unnest.rs @@ -66,6 +66,11 @@ use crate::Column; /// `recursions` instruct how a column should be unnested (e.g unnesting a column multiple /// time, with depth = 1 and depth = 2). Any unnested column not being mentioned inside this /// options is inferred to be unnested with depth = 1 +/// +/// If `position` is set, an additional column is appended to the output containing the +/// position of each element within its source list. The index base is selected by the +/// SQL spelling used: `WITH ORDINALITY` (Postgres, SQL standard) is 1-indexed, +/// `WITH OFFSET` (BigQuery) is 0-indexed. #[derive(Debug, Clone, PartialEq, PartialOrd, Hash, Eq)] pub struct UnnestOptions { /// Should nulls in the input be preserved? Defaults to true @@ -74,6 +79,9 @@ pub struct UnnestOptions { /// declare them here. Any unnested columns not being mentioned inside this option /// will be unnested with depth = 1 pub recursions: Vec, + /// If set, append a position column to the output (per-list element index). + /// Defaults to `None` (no position column emitted). + pub position: Option, } /// Instruction on how to unnest a column (mostly with a list type) @@ -85,12 +93,41 @@ pub struct RecursionUnnestOption { pub depth: usize, } +/// The 0/1 index base for the position column emitted by `UNNEST`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub enum IndexBase { + /// 0-indexed (BigQuery `WITH OFFSET`, Snowflake `FLATTEN.INDEX`, Spark `posexplode`). + Zero, + /// 1-indexed (Postgres / SQL standard `WITH ORDINALITY`, Trino/Presto). + One, +} + +/// Specification for the extra position column produced by `UNNEST WITH ORDINALITY` +/// (1-indexed) or `UNNEST WITH OFFSET` (0-indexed). +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)] +pub struct PositionColumn { + /// Output column name (e.g. `"ordinality"`, `"offset"`, or a user alias). + pub name: String, + /// Whether the column is 0- or 1-indexed. + pub base: IndexBase, +} + +impl PositionColumn { + pub fn new(name: impl Into, base: IndexBase) -> Self { + Self { + name: name.into(), + base, + } + } +} + impl Default for UnnestOptions { fn default() -> Self { Self { // default to true to maintain backwards compatible behavior preserve_nulls: true, recursions: vec![], + position: None, } } } @@ -113,4 +150,10 @@ impl UnnestOptions { self.recursions.push(recursion); self } + + /// Request a position column on the output (see [`PositionColumn`]). + pub fn with_position(mut self, position: PositionColumn) -> Self { + self.position = Some(position); + self + } } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index c572b202f03ce..68a086494a512 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -4286,6 +4286,12 @@ impl Unnest { // 4.unnest_col2_depth_1: int // Meaning the placeholder column will be replaced by its unnested variation(s), note // the plural. + // Pick one of the unnest target indices as the dependency anchor for the + // synthetic position column (it depends on the unnest happening at all, + // not on any one input column). Safe to unwrap because exec_columns was + // verified non-empty above. + let position_dep_index = *indices_to_unnest.keys().next().unwrap(); + let fields = input_schema .iter() .enumerate() @@ -4372,6 +4378,13 @@ impl Unnest { .flatten() .collect::>(); + let mut fields = fields; + if let Some(position) = &options.position { + let pos_field = Arc::new(Field::new(&position.name, DataType::Int64, true)); + fields.push((None, pos_field)); + dependency_indices.push(position_dep_index); + } + let metadata = input_schema.metadata().clone(); let df_schema = DFSchema::new_with_metadata(fields, metadata)?; // We can use the existing functional dependencies: diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 3a4b9d7232f4d..694b258362c6a 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -46,8 +46,8 @@ use arrow_ord::cmp::lt; use async_trait::async_trait; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{ - Constraints, HashMap, HashSet, Result, UnnestOptions, exec_datafusion_err, exec_err, - internal_err, + Constraints, HashMap, HashSet, IndexBase, Result, UnnestOptions, exec_datafusion_err, + exec_err, internal_err, }; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -498,6 +498,7 @@ fn list_unnest_at_level( temp_unnested_arrs: &mut HashMap, level_to_unnest: usize, options: &UnnestOptions, + position_array_out: &mut Option, ) -> Result>> { // Extract unnestable columns at this level let (arrs_to_unnest, list_unnest_specs): (Vec>, Vec<_>) = @@ -543,6 +544,20 @@ fn list_unnest_at_level( // Create the take indices array for other columns let take_indices = create_take_indices(unnested_length, total_length); + + // At the final unnest level, materialize the WITH ORDINALITY / WITH OFFSET + // position column if the caller requested one. Multi-level (recursive) unnest + // currently always reports the leaf-level position. + if level_to_unnest == 1 + && let Some(position) = &options.position + { + *position_array_out = Some(Arc::new(create_position_indices( + unnested_length, + total_length, + position.base, + )) as ArrayRef); + } + unnested_temp_arrays .into_iter() .zip(list_unnest_specs.iter()) @@ -643,6 +658,7 @@ fn build_batch( struct_column_indices: &HashSet, options: &UnnestOptions, ) -> Result> { + let mut position_array: Option = None; let transformed = match list_type_columns.len() { 0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices), _ => { @@ -669,6 +685,7 @@ fn build_batch( &mut temp_unnested_result, depth, options, + &mut position_array, )? else { return Ok(None); @@ -740,7 +757,7 @@ fn build_batch( ) .collect::>(); - let ret = flatten_arrs + let mut ret = flatten_arrs .into_iter() .enumerate() .flat_map(|(col_idx, arr)| { @@ -754,6 +771,12 @@ fn build_batch( }) .collect::>(); + // Append the WITH ORDINALITY / WITH OFFSET position column, if any. + // The Unnest logical plan placed this as the last schema field. + if let Some(pos_arr) = position_array.take() { + ret.push(pos_arr); + } + flatten_struct_cols(&ret, schema, struct_column_indices) } }?; @@ -996,6 +1019,34 @@ fn create_take_indices( builder.finish() } +/// Builds the per-element position column for `UNNEST ... WITH ORDINALITY` +/// (1-indexed) or `UNNEST ... WITH OFFSET` (0-indexed). +/// +/// Given the length array `[2, 3, 1]` and `base = Zero`, returns `[0, 1, 0, 1, 2, 0]`. +/// With `base = One`, returns `[1, 2, 1, 2, 3, 1]`. +fn create_position_indices( + length_array: &PrimitiveArray, + capacity: usize, + base: IndexBase, +) -> PrimitiveArray { + debug_assert!( + length_array.null_count() == 0, + "length array should not contain nulls" + ); + let start: i64 = match base { + IndexBase::Zero => 0, + IndexBase::One => 1, + }; + let mut builder = PrimitiveArray::::builder(capacity); + for repeat in length_array.iter() { + let repeat = repeat.unwrap(); + for i in 0..repeat { + builder.append_value(start + i); + } + } + builder.finish() +} + /// Create a batch of arrays based on an input `batch` and a `indices` array. /// The `indices` array is used by the take kernel to repeat values in the arrays /// that are marked with `true` in the `repeat_mask`. Arrays marked with `false` @@ -1252,6 +1303,7 @@ mod tests { &UnnestOptions { preserve_nulls: true, recursions: vec![], + position: None, }, )? .unwrap(); @@ -1339,6 +1391,7 @@ mod tests { let options = UnnestOptions { preserve_nulls, recursions: vec![], + position: None, }; let longest_length = find_longest_length(list_arrays, &options)?; let expected_array = Int64Array::from(expected); @@ -1392,4 +1445,24 @@ mod tests { assert_eq!(take_indices, expected); Ok(()) } + + #[test] + fn test_create_position_indices_zero_based() -> Result<()> { + // BigQuery `WITH OFFSET` semantics: per-list position starts at 0. + let length_array = Int64Array::from(vec![2, 3, 1]); + let position = create_position_indices(&length_array, 6, IndexBase::Zero); + let expected = Int64Array::from(vec![0, 1, 0, 1, 2, 0]); + assert_eq!(position, expected); + Ok(()) + } + + #[test] + fn test_create_position_indices_one_based() -> Result<()> { + // Postgres / SQL-standard `WITH ORDINALITY` semantics: starts at 1. + let length_array = Int64Array::from(vec![2, 3, 1]); + let position = create_position_indices(&length_array, 6, IndexBase::One); + let expected = Int64Array::from(vec![1, 2, 1, 2, 3, 1]); + assert_eq!(position, expected); + Ok(()) + } } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 865887d41e111..361204cae5524 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -333,6 +333,9 @@ message ColumnUnnestListRecursion { message UnnestOptions { bool preserve_nulls = 1; repeated RecursionUnnestOption recursions = 2; + // Set when `UNNEST ... WITH ORDINALITY` (1-indexed) or `WITH OFFSET` + // (0-indexed) was requested; absent for plain UNNEST. + optional PositionColumn position = 3; } message RecursionUnnestOption { @@ -341,6 +344,19 @@ message RecursionUnnestOption { uint32 depth = 3; } +// 0/1 index base for the synthetic position column produced by UNNEST. +enum IndexBase { + // 0-indexed (BigQuery `WITH OFFSET`, Snowflake `FLATTEN.INDEX`, Spark `posexplode`). + INDEX_BASE_ZERO = 0; + // 1-indexed (Postgres / SQL standard `WITH ORDINALITY`, Trino/Presto). + INDEX_BASE_ONE = 1; +} + +message PositionColumn { + string name = 1; + IndexBase base = 2; +} + message UnionNode { repeated LogicalPlanNode inputs = 1; } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index b8639afd04a89..c9b0188d29a21 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -9423,6 +9423,77 @@ impl<'de> serde::Deserialize<'de> for InListNode { deserializer.deserialize_struct("datafusion.InListNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for IndexBase { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let variant = match self { + Self::Zero => "INDEX_BASE_ZERO", + Self::One => "INDEX_BASE_ONE", + }; + serializer.serialize_str(variant) + } +} +impl<'de> serde::Deserialize<'de> for IndexBase { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "INDEX_BASE_ZERO", + "INDEX_BASE_ONE", + ]; + + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = IndexBase; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) + }) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) + }) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "INDEX_BASE_ZERO" => Ok(IndexBase::Zero), + "INDEX_BASE_ONE" => Ok(IndexBase::One), + _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + } + } + } + deserializer.deserialize_any(GeneratedVisitor) + } +} impl serde::Serialize for InsertOp { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -20469,6 +20540,116 @@ impl<'de> serde::Deserialize<'de> for PlanType { deserializer.deserialize_struct("datafusion.PlanType", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PositionColumn { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.name.is_empty() { + len += 1; + } + if self.base != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PositionColumn", len)?; + if !self.name.is_empty() { + struct_ser.serialize_field("name", &self.name)?; + } + if self.base != 0 { + let v = IndexBase::try_from(self.base) + .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.base)))?; + struct_ser.serialize_field("base", &v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PositionColumn { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "name", + "base", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Name, + Base, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "name" => Ok(GeneratedField::Name), + "base" => Ok(GeneratedField::Base), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PositionColumn; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PositionColumn") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut name__ = None; + let mut base__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Name => { + if name__.is_some() { + return Err(serde::de::Error::duplicate_field("name")); + } + name__ = Some(map_.next_value()?); + } + GeneratedField::Base => { + if base__.is_some() { + return Err(serde::de::Error::duplicate_field("base")); + } + base__ = Some(map_.next_value::()? as i32); + } + } + } + Ok(PositionColumn { + name: name__.unwrap_or_default(), + base: base__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion.PositionColumn", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PrepareNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -25011,6 +25192,9 @@ impl serde::Serialize for UnnestOptions { if !self.recursions.is_empty() { len += 1; } + if self.position.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.UnnestOptions", len)?; if self.preserve_nulls { struct_ser.serialize_field("preserveNulls", &self.preserve_nulls)?; @@ -25018,6 +25202,9 @@ impl serde::Serialize for UnnestOptions { if !self.recursions.is_empty() { struct_ser.serialize_field("recursions", &self.recursions)?; } + if let Some(v) = self.position.as_ref() { + struct_ser.serialize_field("position", v)?; + } struct_ser.end() } } @@ -25031,12 +25218,14 @@ impl<'de> serde::Deserialize<'de> for UnnestOptions { "preserve_nulls", "preserveNulls", "recursions", + "position", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { PreserveNulls, Recursions, + Position, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -25060,6 +25249,7 @@ impl<'de> serde::Deserialize<'de> for UnnestOptions { match value { "preserveNulls" | "preserve_nulls" => Ok(GeneratedField::PreserveNulls), "recursions" => Ok(GeneratedField::Recursions), + "position" => Ok(GeneratedField::Position), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -25081,6 +25271,7 @@ impl<'de> serde::Deserialize<'de> for UnnestOptions { { let mut preserve_nulls__ = None; let mut recursions__ = None; + let mut position__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::PreserveNulls => { @@ -25095,11 +25286,18 @@ impl<'de> serde::Deserialize<'de> for UnnestOptions { } recursions__ = Some(map_.next_value()?); } + GeneratedField::Position => { + if position__.is_some() { + return Err(serde::de::Error::duplicate_field("position")); + } + position__ = map_.next_value()?; + } } } Ok(UnnestOptions { preserve_nulls: preserve_nulls__.unwrap_or_default(), recursions: recursions__.unwrap_or_default(), + position: position__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index b742e82ea24ec..8267bda31922d 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -532,6 +532,10 @@ pub struct UnnestOptions { pub preserve_nulls: bool, #[prost(message, repeated, tag = "2")] pub recursions: ::prost::alloc::vec::Vec, + /// Set when `UNNEST ... WITH ORDINALITY` (1-indexed) or `WITH OFFSET` + /// (0-indexed) was requested; absent for plain UNNEST. + #[prost(message, optional, tag = "3")] + pub position: ::core::option::Option, } #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct RecursionUnnestOption { @@ -542,6 +546,13 @@ pub struct RecursionUnnestOption { #[prost(uint32, tag = "3")] pub depth: u32, } +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct PositionColumn { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + #[prost(enumeration = "IndexBase", tag = "2")] + pub base: i32, +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct UnionNode { #[prost(message, repeated, tag = "1")] @@ -2306,6 +2317,35 @@ impl FileFormatKind { } } } +/// 0/1 index base for the synthetic position column produced by UNNEST. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum IndexBase { + /// 0-indexed (BigQuery `WITH OFFSET`, Snowflake `FLATTEN.INDEX`, Spark `posexplode`). + Zero = 0, + /// 1-indexed (Postgres / SQL standard `WITH ORDINALITY`, Trino/Presto). + One = 1, +} +impl IndexBase { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Zero => "INDEX_BASE_ZERO", + Self::One => "INDEX_BASE_ONE", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "INDEX_BASE_ZERO" => Some(Self::Zero), + "INDEX_BASE_ONE" => Some(Self::One), + _ => None, + } + } +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum WindowFrameUnits { diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 78ffd362c8e48..0e3c1a95bcb49 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -20,8 +20,9 @@ use std::sync::Arc; use arrow::datatypes::{DataType, Field}; use datafusion_common::datatype::DataTypeExt; use datafusion_common::{ - NullEquality, RecursionUnnestOption, Result, ScalarValue, TableReference, - UnnestOptions, exec_datafusion_err, internal_err, plan_datafusion_err, + IndexBase, NullEquality, PositionColumn, RecursionUnnestOption, Result, ScalarValue, + TableReference, UnnestOptions, exec_datafusion_err, internal_err, + plan_datafusion_err, }; use datafusion_execution::TaskContext; use datafusion_execution::registry::FunctionRegistry; @@ -60,6 +61,7 @@ impl From<&protobuf::UnnestOptions> for UnnestOptions { fn from(opts: &protobuf::UnnestOptions) -> Self { Self { preserve_nulls: opts.preserve_nulls, + position: opts.position.as_ref().map(Into::into), recursions: opts .recursions .iter() @@ -73,6 +75,19 @@ impl From<&protobuf::UnnestOptions> for UnnestOptions { } } +impl From<&protobuf::PositionColumn> for PositionColumn { + fn from(pos: &protobuf::PositionColumn) -> Self { + let base = match protobuf::IndexBase::try_from(pos.base) { + Ok(protobuf::IndexBase::One) => IndexBase::One, + _ => IndexBase::Zero, + }; + Self { + name: pos.name.clone(), + base, + } + } +} + impl From for WindowFrameUnits { fn from(units: protobuf::WindowFrameUnits) -> Self { match units { diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index d79107d1d0f2b..fe197001b3d5d 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -21,7 +21,9 @@ use std::collections::HashMap; -use datafusion_common::{NullEquality, TableReference, UnnestOptions}; +use datafusion_common::{ + IndexBase, NullEquality, PositionColumn, TableReference, UnnestOptions, +}; use datafusion_expr::WriteOp; use datafusion_expr::dml::InsertOp; use datafusion_expr::expr::{ @@ -65,6 +67,20 @@ impl From<&UnnestOptions> for protobuf::UnnestOptions { depth: r.depth as u32, }) .collect(), + position: opts.position.as_ref().map(Into::into), + } + } +} + +impl From<&PositionColumn> for protobuf::PositionColumn { + fn from(pos: &PositionColumn) -> Self { + let base = match pos.base { + IndexBase::Zero => protobuf::IndexBase::Zero, + IndexBase::One => protobuf::IndexBase::One, + }; + Self { + name: pos.name.clone(), + base: base as i32, } } } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 3e79ddab723eb..e559611cba127 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -1068,6 +1068,28 @@ async fn roundtrip_logical_plan_unnest() -> Result<()> { Ok(()) } +#[tokio::test] +async fn roundtrip_logical_plan_unnest_with_ordinality() -> Result<()> { + let ctx = SessionContext::new(); + let query = "SELECT * FROM UNNEST([10, 20, 30]) WITH ORDINALITY"; + let plan = ctx.sql(query).await?.into_optimized_plan()?; + let bytes = logical_plan_to_bytes(&plan)?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?; + assert_eq!(format!("{plan}"), format!("{logical_round_trip}")); + Ok(()) +} + +#[tokio::test] +async fn roundtrip_logical_plan_unnest_with_offset_alias() -> Result<()> { + let ctx = SessionContext::new(); + let query = "SELECT * FROM UNNEST([10, 20, 30]) WITH OFFSET pos"; + let plan = ctx.sql(query).await?.into_optimized_plan()?; + let bytes = logical_plan_to_bytes(&plan)?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?; + assert_eq!(format!("{plan}"), format!("{logical_round_trip}")); + Ok(()) +} + #[tokio::test] async fn roundtrip_expr_api() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 08a292475fd72..2c05260e7619c 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -21,7 +21,8 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{ - DFSchema, Diagnostic, Result, Span, Spans, TableReference, not_impl_err, plan_err, + DFSchema, Diagnostic, IndexBase, PositionColumn, Result, Span, Spans, TableReference, + not_impl_err, plan_err, }; use datafusion_expr::builder::subquery_alias; use datafusion_expr::planner::{ @@ -228,13 +229,34 @@ impl SqlToRel<'_, S> { TableFactor::UNNEST { alias, array_exprs, - with_offset: false, - with_offset_alias: None, + with_offset, + with_offset_alias, with_ordinality, } => { - if with_ordinality { - return not_impl_err!("UNNEST with ordinality is not supported yet"); - } + // `WITH ORDINALITY` (Postgres / SQL standard, 1-indexed) and + // `WITH OFFSET` (BigQuery, 0-indexed) are syntactic siblings — + // no dialect accepts both at once. + let position = match (with_ordinality, with_offset) { + (false, false) => None, + (true, true) => { + return plan_err!( + "UNNEST cannot use WITH ORDINALITY and WITH OFFSET together" + ); + } + (true, false) => { + // `with_offset_alias` is only populated when `WITH OFFSET ` + // is parsed, which would also flip `with_offset` to true and be + // caught by the (true, true) branch above; this is just defensive. + Some(PositionColumn::new("ordinality", IndexBase::One)) + } + (false, true) => { + let name = with_offset_alias + .as_ref() + .map(|i| self.ident_normalizer.normalize(i.clone())) + .unwrap_or_else(|| "offset".to_string()); + Some(PositionColumn::new(name, IndexBase::Zero)) + } + }; // Unnest table factor has empty input let schema = DFSchema::empty(); @@ -256,14 +278,10 @@ impl SqlToRel<'_, S> { if unnest_exprs.is_empty() { return plan_err!("UNNEST must have at least one argument"); } - let logical_plan = self.try_process_unnest(input, unnest_exprs)?; + let logical_plan = + self.try_process_unnest(input, unnest_exprs, position.as_ref())?; (logical_plan, alias) } - TableFactor::UNNEST { .. } => { - return not_impl_err!( - "UNNEST table factor with offset is not supported yet" - ); - } TableFactor::Function { name, args, alias, .. } => { diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index b7f7d80e70815..ffbfa0c84cdb4 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -32,7 +32,7 @@ use arrow::datatypes::DataType; use datafusion_common::error::DataFusionErrorBuilder; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{Column, DFSchema, DFSchemaRef, Result, not_impl_err, plan_err}; -use datafusion_common::{RecursionUnnestOption, UnnestOptions}; +use datafusion_common::{PositionColumn, RecursionUnnestOption, UnnestOptions}; use datafusion_expr::ExprSchemable; use datafusion_expr::builder::get_struct_unnested_columns; use datafusion_expr::expr::{PlannedReplaceSelectItem, WildcardOptions}; @@ -378,7 +378,7 @@ impl SqlToRel<'_, S> { }; // Try processing unnest expression or do the final projection - let plan = self.try_process_unnest(plan, select_exprs_post_aggr)?; + let plan = self.try_process_unnest(plan, select_exprs_post_aggr, None)?; // Process distinct clause let plan = match select.distinct { @@ -436,10 +436,16 @@ impl SqlToRel<'_, S> { } /// Try converting Expr(Unnest(Expr)) to Projection/Unnest/Projection + /// + /// `position` is set when the caller is the `UNNEST` table factor with + /// `WITH ORDINALITY` (Postgres / SQL standard, 1-indexed) or `WITH OFFSET` + /// (BigQuery, 0-indexed); the resulting Unnest plan gains an extra + /// position column. pub(super) fn try_process_unnest( &self, input: LogicalPlan, select_exprs: Vec, + position: Option<&PositionColumn>, ) -> Result { // Try process group by unnest let input = self.try_process_aggregate_unnest(input)?; @@ -504,6 +510,13 @@ impl SqlToRel<'_, S> { } else { // Set preserve_nulls to false to ensure compatibility with DuckDB and PostgreSQL let mut unnest_options = UnnestOptions::new().with_preserve_nulls(false); + // Only attach the WITH ORDINALITY / WITH OFFSET position column on the + // first iteration; nested unnests would otherwise pile on duplicates. + if i == 0 + && let Some(pos) = position + { + unnest_options = unnest_options.with_position(pos.to_owned()); + } let mut unnest_col_vec = vec![]; for (col, maybe_list_unnest) in unnest_columns.into_iter() { @@ -530,6 +543,12 @@ impl SqlToRel<'_, S> { } } + // Project the synthetic position column alongside the rewritten unnest values + // so callers (e.g. `SELECT *` over the table factor) can see it. + if let Some(pos) = &position { + intermediate_select_exprs.push(Expr::Column(Column::from_name(&pos.name))); + } + LogicalPlanBuilder::from(intermediate_plan) .project(intermediate_select_exprs)? .build() diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index faeb5d59578e5..8ee4edb4d4434 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -422,12 +422,44 @@ select a from unnest([1,2]) as t(a); 2 -## Unnest in from clause with offset is not supported -query error DataFusion error: This feature is not implemented: UNNEST table factor with offset is not supported yet -select * from unnest([1,2]) with offset; +## UNNEST WITH OFFSET (BigQuery-style, 0-indexed). Default position column name is `offset`. +query II +select * from unnest([10, 20, 30]) with offset; +---- +10 0 +20 1 +30 2 + +## WITH OFFSET AS renames the position column. +query II +select * from unnest([10, 20, 30]) with offset offset_alias; +---- +10 0 +20 1 +30 2 + +## UNNEST WITH ORDINALITY (Postgres / SQL standard, 1-indexed). Default name is `ordinality`. +query II +select * from unnest([10, 20, 30]) with ordinality; +---- +10 1 +20 2 +30 3 + +## WITH ORDINALITY combined with the PG-style column-list table alias renames both columns. +query II +select v, ord from unnest([10, 20, 30]) with ordinality as t(v, ord); +---- +10 1 +20 2 +30 3 + +## WITH ORDINALITY and WITH OFFSET are mutually exclusive (no dialect accepts both). +query error DataFusion error: Error during planning: UNNEST cannot use WITH ORDINALITY and WITH OFFSET together +select * from unnest([1,2]) with ordinality with offset; -query error DataFusion error: This feature is not implemented: UNNEST table factor with offset is not supported yet -select * from unnest([1,2]) with offset offset_alias; +query error DataFusion error: Error during planning: UNNEST cannot use WITH ORDINALITY and WITH OFFSET together +select * from unnest([1,2]) with ordinality with offset offset_alias; ## More complex cases