Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub use schema_reference::SchemaReference;
pub use spans::{Location, Span, Spans};
pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{ResolvedTableReference, TableReference};
pub use unnest::{RecursionUnnestOption, UnnestOptions};
pub use unnest::{IndexBase, PositionColumn, RecursionUnnestOption, UnnestOptions};
pub use utils::project_schema;

// These are hidden from docs purely to avoid polluting the public view of what this crate exports.
Expand Down
43 changes: 43 additions & 0 deletions datafusion/common/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ use crate::Column;
/// `recursions` instruct how a column should be unnested (e.g unnesting a column multiple
/// time, with depth = 1 and depth = 2). Any unnested column not being mentioned inside this
/// options is inferred to be unnested with depth = 1
///
/// If `position` is set, an additional column is appended to the output containing the
/// position of each element within its source list. The index base is selected by the
/// SQL spelling used: `WITH ORDINALITY` (Postgres, SQL standard) is 1-indexed,
/// `WITH OFFSET` (BigQuery) is 0-indexed.
#[derive(Debug, Clone, PartialEq, PartialOrd, Hash, Eq)]
pub struct UnnestOptions {
/// Should nulls in the input be preserved? Defaults to true
Expand All @@ -74,6 +79,9 @@ pub struct UnnestOptions {
/// declare them here. Any unnested columns not being mentioned inside this option
/// will be unnested with depth = 1
pub recursions: Vec<RecursionUnnestOption>,
/// If set, append a position column to the output (per-list element index).
/// Defaults to `None` (no position column emitted).
pub position: Option<PositionColumn>,
}

/// Instruction on how to unnest a column (mostly with a list type)
Expand All @@ -85,12 +93,41 @@ pub struct RecursionUnnestOption {
pub depth: usize,
}

/// The 0/1 index base for the position column emitted by `UNNEST`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum IndexBase {
/// 0-indexed (BigQuery `WITH OFFSET`, Snowflake `FLATTEN.INDEX`, Spark `posexplode`).
Zero,
/// 1-indexed (Postgres / SQL standard `WITH ORDINALITY`, Trino/Presto).
One,
}

/// Specification for the extra position column produced by `UNNEST WITH ORDINALITY`
/// (1-indexed) or `UNNEST WITH OFFSET` (0-indexed).
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
pub struct PositionColumn {
/// Output column name (e.g. `"ordinality"`, `"offset"`, or a user alias).
pub name: String,
/// Whether the column is 0- or 1-indexed.
pub base: IndexBase,
}

impl PositionColumn {
pub fn new(name: impl Into<String>, base: IndexBase) -> Self {
Self {
name: name.into(),
base,
}
}
}

impl Default for UnnestOptions {
fn default() -> Self {
Self {
// default to true to maintain backwards compatible behavior
preserve_nulls: true,
recursions: vec![],
position: None,
}
}
}
Expand All @@ -113,4 +150,10 @@ impl UnnestOptions {
self.recursions.push(recursion);
self
}

/// Request a position column on the output (see [`PositionColumn`]).
pub fn with_position(mut self, position: PositionColumn) -> Self {
self.position = Some(position);
self
}
}
13 changes: 13 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4286,6 +4286,12 @@ impl Unnest {
// 4.unnest_col2_depth_1: int
// Meaning the placeholder column will be replaced by its unnested variation(s), note
// the plural.
// Pick one of the unnest target indices as the dependency anchor for the
// synthetic position column (it depends on the unnest happening at all,
// not on any one input column). Safe to unwrap because exec_columns was
// verified non-empty above.
let position_dep_index = *indices_to_unnest.keys().next().unwrap();

let fields = input_schema
.iter()
.enumerate()
Expand Down Expand Up @@ -4372,6 +4378,13 @@ impl Unnest {
.flatten()
.collect::<Vec<_>>();

let mut fields = fields;
if let Some(position) = &options.position {
let pos_field = Arc::new(Field::new(&position.name, DataType::Int64, true));
fields.push((None, pos_field));
dependency_indices.push(position_dep_index);
}

let metadata = input_schema.metadata().clone();
let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
// We can use the existing functional dependencies:
Expand Down
79 changes: 76 additions & 3 deletions datafusion/physical-plan/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use arrow_ord::cmp::lt;
use async_trait::async_trait;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::{
Constraints, HashMap, HashSet, Result, UnnestOptions, exec_datafusion_err, exec_err,
internal_err,
Constraints, HashMap, HashSet, IndexBase, Result, UnnestOptions, exec_datafusion_err,
exec_err, internal_err,
};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
Expand Down Expand Up @@ -498,6 +498,7 @@ fn list_unnest_at_level(
temp_unnested_arrs: &mut HashMap<ListUnnest, ArrayRef>,
level_to_unnest: usize,
options: &UnnestOptions,
position_array_out: &mut Option<ArrayRef>,
) -> Result<Option<Vec<ArrayRef>>> {
// Extract unnestable columns at this level
let (arrs_to_unnest, list_unnest_specs): (Vec<Arc<dyn Array>>, Vec<_>) =
Expand Down Expand Up @@ -543,6 +544,20 @@ fn list_unnest_at_level(

// Create the take indices array for other columns
let take_indices = create_take_indices(unnested_length, total_length);

// At the final unnest level, materialize the WITH ORDINALITY / WITH OFFSET
// position column if the caller requested one. Multi-level (recursive) unnest
// currently always reports the leaf-level position.
if level_to_unnest == 1
&& let Some(position) = &options.position
{
*position_array_out = Some(Arc::new(create_position_indices(
unnested_length,
total_length,
position.base,
)) as ArrayRef);
}

unnested_temp_arrays
.into_iter()
.zip(list_unnest_specs.iter())
Expand Down Expand Up @@ -643,6 +658,7 @@ fn build_batch(
struct_column_indices: &HashSet<usize>,
options: &UnnestOptions,
) -> Result<Option<RecordBatch>> {
let mut position_array: Option<ArrayRef> = None;
let transformed = match list_type_columns.len() {
0 => flatten_struct_cols(batch.columns(), schema, struct_column_indices),
_ => {
Expand All @@ -669,6 +685,7 @@ fn build_batch(
&mut temp_unnested_result,
depth,
options,
&mut position_array,
)?
else {
return Ok(None);
Expand Down Expand Up @@ -740,7 +757,7 @@ fn build_batch(
)
.collect::<HashMap<_, _>>();

let ret = flatten_arrs
let mut ret = flatten_arrs
.into_iter()
.enumerate()
.flat_map(|(col_idx, arr)| {
Expand All @@ -754,6 +771,12 @@ fn build_batch(
})
.collect::<Vec<_>>();

// Append the WITH ORDINALITY / WITH OFFSET position column, if any.
// The Unnest logical plan placed this as the last schema field.
if let Some(pos_arr) = position_array.take() {
ret.push(pos_arr);
}

flatten_struct_cols(&ret, schema, struct_column_indices)
}
}?;
Expand Down Expand Up @@ -996,6 +1019,34 @@ fn create_take_indices(
builder.finish()
}

/// Builds the per-element position column for `UNNEST ... WITH ORDINALITY`
/// (1-indexed) or `UNNEST ... WITH OFFSET` (0-indexed).
///
/// Given the length array `[2, 3, 1]` and `base = Zero`, returns `[0, 1, 0, 1, 2, 0]`.
/// With `base = One`, returns `[1, 2, 1, 2, 3, 1]`.
fn create_position_indices(
length_array: &PrimitiveArray<Int64Type>,
capacity: usize,
base: IndexBase,
) -> PrimitiveArray<Int64Type> {
debug_assert!(
length_array.null_count() == 0,
"length array should not contain nulls"
);
let start: i64 = match base {
IndexBase::Zero => 0,
IndexBase::One => 1,
};
let mut builder = PrimitiveArray::<Int64Type>::builder(capacity);
for repeat in length_array.iter() {
let repeat = repeat.unwrap();
for i in 0..repeat {
builder.append_value(start + i);
}
}
builder.finish()
}

/// Create a batch of arrays based on an input `batch` and a `indices` array.
/// The `indices` array is used by the take kernel to repeat values in the arrays
/// that are marked with `true` in the `repeat_mask`. Arrays marked with `false`
Expand Down Expand Up @@ -1252,6 +1303,7 @@ mod tests {
&UnnestOptions {
preserve_nulls: true,
recursions: vec![],
position: None,
},
)?
.unwrap();
Expand Down Expand Up @@ -1339,6 +1391,7 @@ mod tests {
let options = UnnestOptions {
preserve_nulls,
recursions: vec![],
position: None,
};
let longest_length = find_longest_length(list_arrays, &options)?;
let expected_array = Int64Array::from(expected);
Expand Down Expand Up @@ -1392,4 +1445,24 @@ mod tests {
assert_eq!(take_indices, expected);
Ok(())
}

#[test]
fn test_create_position_indices_zero_based() -> Result<()> {
// BigQuery `WITH OFFSET` semantics: per-list position starts at 0.
let length_array = Int64Array::from(vec![2, 3, 1]);
let position = create_position_indices(&length_array, 6, IndexBase::Zero);
let expected = Int64Array::from(vec![0, 1, 0, 1, 2, 0]);
assert_eq!(position, expected);
Ok(())
}

#[test]
fn test_create_position_indices_one_based() -> Result<()> {
// Postgres / SQL-standard `WITH ORDINALITY` semantics: starts at 1.
let length_array = Int64Array::from(vec![2, 3, 1]);
let position = create_position_indices(&length_array, 6, IndexBase::One);
let expected = Int64Array::from(vec![1, 2, 1, 2, 3, 1]);
assert_eq!(position, expected);
Ok(())
}
}
16 changes: 16 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ message ColumnUnnestListRecursion {
message UnnestOptions {
bool preserve_nulls = 1;
repeated RecursionUnnestOption recursions = 2;
// Set when `UNNEST ... WITH ORDINALITY` (1-indexed) or `WITH OFFSET`
// (0-indexed) was requested; absent for plain UNNEST.
optional PositionColumn position = 3;
}

message RecursionUnnestOption {
Expand All @@ -341,6 +344,19 @@ message RecursionUnnestOption {
uint32 depth = 3;
}

// 0/1 index base for the synthetic position column produced by UNNEST.
enum IndexBase {
// 0-indexed (BigQuery `WITH OFFSET`, Snowflake `FLATTEN.INDEX`, Spark `posexplode`).
INDEX_BASE_ZERO = 0;
// 1-indexed (Postgres / SQL standard `WITH ORDINALITY`, Trino/Presto).
INDEX_BASE_ONE = 1;
}

message PositionColumn {
string name = 1;
IndexBase base = 2;
}

message UnionNode {
repeated LogicalPlanNode inputs = 1;
}
Expand Down
Loading