diff --git a/Cargo.lock b/Cargo.lock index 895b3059f50c1..f154536fb1e11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1756,6 +1756,7 @@ dependencies = [ "datafusion-expr-common", "datafusion-functions", "datafusion-functions-aggregate", + "datafusion-functions-json", "datafusion-functions-nested", "datafusion-functions-table", "datafusion-functions-window", @@ -2296,6 +2297,21 @@ dependencies = [ "rand 0.9.2", ] +[[package]] +name = "datafusion-functions-json" +version = "53.0.0" +dependencies = [ + "arrow", + "datafusion-common", + "datafusion-doc", + "datafusion-execution", + "datafusion-expr", + "datafusion-macros", + "log", + "serde_json", + "tokio", +] + [[package]] name = "datafusion-functions-nested" version = "53.0.0" diff --git a/Cargo.toml b/Cargo.toml index 64673c025d299..cc169b205c8ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ members = [ "datafusion/ffi", "datafusion/functions", "datafusion/functions-aggregate", + "datafusion/functions-json", "datafusion/functions-aggregate-common", "datafusion/functions-table", "datafusion/functions-nested", @@ -135,6 +136,7 @@ datafusion-ffi = { path = "datafusion/ffi", version = "53.0.0" } datafusion-functions = { path = "datafusion/functions", version = "53.0.0" } datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "53.0.0" } datafusion-functions-aggregate-common = { path = "datafusion/functions-aggregate-common", version = "53.0.0" } +datafusion-functions-json = { path = "datafusion/functions-json", version = "53.0.0" } datafusion-functions-nested = { path = "datafusion/functions-nested", version = "53.0.0", default-features = false } datafusion-functions-table = { path = "datafusion/functions-table", version = "53.0.0" } datafusion-functions-window = { path = "datafusion/functions-window", version = "53.0.0" } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index c1230f7d5daa6..e7e5485cd7e93 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -39,6 +39,7 @@ all-features = true workspace = true [features] +json_expressions = ["datafusion-functions-json"] nested_expressions = ["datafusion-functions-nested"] # This feature is deprecated. Use the `nested_expressions` feature instead. array_expressions = ["nested_expressions"] @@ -131,6 +132,7 @@ datafusion-expr = { workspace = true, default-features = false } datafusion-expr-common = { workspace = true } datafusion-functions = { workspace = true } datafusion-functions-aggregate = { workspace = true } +datafusion-functions-json = { workspace = true, optional = true } datafusion-functions-nested = { workspace = true, default-features = false, optional = true } datafusion-functions-table = { workspace = true } datafusion-functions-window = { workspace = true } diff --git a/datafusion/core/src/execution/session_state_defaults.rs b/datafusion/core/src/execution/session_state_defaults.rs index 8ef041e5bf640..a0e283ce866b4 100644 --- a/datafusion/core/src/execution/session_state_defaults.rs +++ b/datafusion/core/src/execution/session_state_defaults.rs @@ -27,6 +27,8 @@ use crate::datasource::file_format::json::JsonFormatFactory; use crate::datasource::file_format::parquet::ParquetFormatFactory; use crate::datasource::provider::DefaultTableFactory; use crate::execution::context::SessionState; +#[cfg(feature = "json_expressions")] +use crate::functions_json; #[cfg(feature = "nested_expressions")] use crate::functions_nested; use crate::{functions, functions_aggregate, functions_table, functions_window}; @@ -104,12 +106,18 @@ impl SessionStateDefaults { /// returns the list of default [`ScalarUDF`]s pub fn default_scalar_functions() -> Vec> { - #[cfg_attr(not(feature = "nested_expressions"), expect(unused_mut))] + #[cfg_attr( + not(any(feature = "nested_expressions", feature = "json_expressions")), + expect(unused_mut) + )] let mut functions: Vec> = functions::all_default_functions(); #[cfg(feature = "nested_expressions")] functions.append(&mut functions_nested::all_default_nested_functions()); + #[cfg(feature = "json_expressions")] + functions.append(&mut functions_json::all_default_json_functions()); + functions } @@ -150,9 +158,10 @@ impl SessionStateDefaults { file_formats } - /// registers all builtin functions - scalar, array and aggregate + /// registers all builtin functions - scalar, array, json, and aggregate pub fn register_builtin_functions(state: &mut SessionState) { Self::register_scalar_functions(state); + Self::register_json_functions(state); Self::register_array_functions(state); Self::register_aggregate_functions(state); } @@ -162,6 +171,15 @@ impl SessionStateDefaults { functions::register_all(state).expect("can not register built in functions"); } + /// registers all the builtin JSON functions + #[cfg_attr(not(feature = "json_expressions"), expect(unused_variables))] + pub fn register_json_functions(state: &mut SessionState) { + // register crate of JSON expressions (if enabled) + #[cfg(feature = "json_expressions")] + functions_json::register_all(state) + .expect("can not register JSON functions"); + } + /// registers all the builtin array functions #[cfg_attr(not(feature = "nested_expressions"), expect(unused_variables))] pub fn register_array_functions(state: &mut SessionState) { diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 1d8368f54ba20..2a3f273c3bb87 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -718,6 +718,7 @@ //! * [datafusion_expr]: [`LogicalPlan`], [`Expr`] and related logical planning structure //! * [datafusion_functions]: Scalar function packages //! * [datafusion_functions_aggregate]: Aggregate functions such as `MIN`, `MAX`, `SUM`, etc +//! * [datafusion_functions_json]: JSON scalar functions such as `json_get_str` //! * [datafusion_functions_nested]: Scalar function packages for `ARRAY`s, `MAP`s and `STRUCT`s //! * [datafusion_functions_table]: Table Functions such as `GENERATE_SERIES` //! * [datafusion_functions_window]: Window functions such as `ROW_NUMBER`, `RANK`, etc @@ -865,6 +866,12 @@ pub mod functions { pub use datafusion_functions::*; } +/// re-export of [`datafusion_functions_json`] crate, if "json_expressions" feature is enabled +pub mod functions_json { + #[cfg(feature = "json_expressions")] + pub use datafusion_functions_json::*; +} + /// re-export of [`datafusion_functions_nested`] crate, if "nested_expressions" feature is enabled pub mod functions_nested { #[cfg(feature = "nested_expressions")] diff --git a/datafusion/functions-json/Cargo.toml b/datafusion/functions-json/Cargo.toml new file mode 100644 index 0000000000000..69938af925fa7 --- /dev/null +++ b/datafusion/functions-json/Cargo.toml @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-functions-json" +description = "JSON function package for the DataFusion query engine" +keywords = ["datafusion", "json", "functions"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[package.metadata.docs.rs] +all-features = true + +# Note: add additional linter rules in lib.rs. +# Rust does not support workspace + new linter rules in subcrates yet +# https://github.com/rust-lang/cargo/issues/13157 +[lints] +workspace = true + +[lib] +name = "datafusion_functions_json" + +[features] +default = [] + +[dependencies] +arrow = { workspace = true } +datafusion-common = { workspace = true } +datafusion-doc = { workspace = true } +datafusion-execution = { workspace = true } +datafusion-expr = { workspace = true, default-features = false } +datafusion-macros = { workspace = true } +log = { workspace = true } +serde_json = { workspace = true } + +[dev-dependencies] +arrow = { workspace = true, features = ["test_utils"] } +tokio = { workspace = true, features = ["macros", "rt"] } diff --git a/datafusion/functions-json/README.md b/datafusion/functions-json/README.md new file mode 100644 index 0000000000000..c810b59d409ad --- /dev/null +++ b/datafusion/functions-json/README.md @@ -0,0 +1,43 @@ + + +# datafusion-functions-json + +JSON scalar functions for [DataFusion](https://datafusion.apache.org/). + +This crate provides JSON manipulation functions operating on JSON-encoded strings. +Based on the [datafusion-functions-json](https://github.com/datafusion-contrib/datafusion-functions-json) +community crate. + +## Functions + +| Function | Description | +|----------|-------------| +| `json_get_str(json, key1, ...)` | Extract a string value from a JSON string at the given path | + +## Usage + +These functions are registered automatically when the `json_expressions` feature +is enabled on the `datafusion` crate. They can also be registered manually: + +```rust +use datafusion_functions_json; +// registry is a FunctionRegistry, e.g. SessionState +datafusion_functions_json::register_all(&mut registry)?; +``` diff --git a/datafusion/functions-json/src/json_get_str.rs b/datafusion/functions-json/src/json_get_str.rs new file mode 100644 index 0000000000000..fad52466ef9a1 --- /dev/null +++ b/datafusion/functions-json/src/json_get_str.rs @@ -0,0 +1,463 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`JsonGetStr`] UDF implementation for extracting string values from JSON. + +use arrow::array::{Array, ArrayRef, AsArray, StringArray, StringBuilder}; +use arrow::datatypes::DataType; +use datafusion_common::{Result, ScalarValue, exec_err, plan_err}; +use datafusion_expr::{ + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, + TypeSignature, Volatility, +}; +use datafusion_macros::user_doc; +use std::sync::Arc; + +#[user_doc( + doc_section(label = "JSON Functions"), + description = r#"Extract a string value from a JSON string at the given path. + +The path is specified as one or more keys (strings for object access) or +indices (integers for array access). Returns NULL if the path does not exist +or the value at the path is not a string."#, + syntax_example = "json_get_str(json_string, key1[, key2, ...])", + sql_example = r#"```sql +> select json_get_str('{"a": {"b": "hello"}}', 'a', 'b'); ++-----------------------------------------------------------+ +| json_get_str(Utf8("{"a": {"b": "hello"}}"),Utf8("a"),Utf8("b")) | ++-----------------------------------------------------------+ +| hello | ++-----------------------------------------------------------+ +```"#, + argument( + name = "json_string", + description = "A string containing valid JSON data." + ), + argument( + name = "keys", + description = "One or more path keys (string for object key, integer for array index)." + ) +)] +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct JsonGetStr { + signature: Signature, +} + +impl Default for JsonGetStr { + fn default() -> Self { + Self::new() + } +} + +impl JsonGetStr { + pub fn new() -> Self { + Self { + signature: Signature::new( + TypeSignature::UserDefined, + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for JsonGetStr { + fn name(&self) -> &str { + "json_get_str" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + if arg_types.len() < 2 { + return plan_err!( + "json_get_str requires at least 2 arguments (json_string, key), got {}", + arg_types.len() + ); + } + // First arg must be a string type; remaining are path keys (string or integer) + let json_type = match &arg_types[0] { + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => { + arg_types[0].clone() + } + DataType::Null => DataType::Utf8, + other => { + return plan_err!( + "json_get_str first argument must be a string type, got {other}" + ); + } + }; + let mut coerced = vec![json_type]; + for (i, dt) in arg_types[1..].iter().enumerate() { + let coerced_type = match dt { + DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => dt.clone(), + DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 + | DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => { + dt.clone() + } + DataType::Null => DataType::Utf8, + other => { + return plan_err!( + "json_get_str path argument {} must be a string or integer type, got {other}", + i + 1 + ); + } + }; + coerced.push(coerced_type); + } + Ok(coerced) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + + let json_arg = &args.args[0]; + let path_args = &args.args[1..]; + + // Extract path keys from the scalar arguments + let path_keys = path_args + .iter() + .map(|arg| match arg { + ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => Ok(PathKey::Key(s.clone())), + ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => { + Ok(PathKey::Key(s.clone())) + } + ColumnarValue::Scalar(ScalarValue::Utf8View(Some(s))) => { + Ok(PathKey::Key(s.clone())) + } + ColumnarValue::Scalar(ScalarValue::Int64(Some(i))) => Ok(PathKey::Index(*i as usize)), + ColumnarValue::Scalar(ScalarValue::Int32(Some(i))) => Ok(PathKey::Index(*i as usize)), + ColumnarValue::Scalar(ScalarValue::UInt64(Some(i))) => Ok(PathKey::Index(*i as usize)), + ColumnarValue::Scalar(s) if s.is_null() => Ok(PathKey::Null), + _ => exec_err!( + "json_get_str path arguments must be scalar strings or integers, got {:?}", + arg.data_type() + ), + }) + .collect::>>()?; + + // If any path key is null, the result is null + if path_keys.iter().any(|k| matches!(k, PathKey::Null)) { + return match json_arg { + ColumnarValue::Array(arr) => { + let null_array: ArrayRef = Arc::new(StringArray::new_null(arr.len())); + Ok(ColumnarValue::Array(null_array)) + } + ColumnarValue::Scalar(_) => { + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))) + } + }; + } + + match json_arg { + ColumnarValue::Array(array) => { + let len = array.len(); + let mut builder = StringBuilder::with_capacity(len, len * 32); + + match array.data_type() { + DataType::Utf8 => { + let arr = array.as_string::(); + for i in 0..len { + if arr.is_null(i) { + builder.append_null(); + } else { + match extract_str_at_path(arr.value(i), &path_keys) { + Some(s) => builder.append_value(&s), + None => builder.append_null(), + } + } + } + } + DataType::LargeUtf8 => { + let arr = array.as_string::(); + for i in 0..len { + if arr.is_null(i) { + builder.append_null(); + } else { + match extract_str_at_path(arr.value(i), &path_keys) { + Some(s) => builder.append_value(&s), + None => builder.append_null(), + } + } + } + } + DataType::Utf8View => { + let arr = array.as_string_view(); + for i in 0..len { + if arr.is_null(i) { + builder.append_null(); + } else { + match extract_str_at_path(arr.value(i), &path_keys) { + Some(s) => builder.append_value(&s), + None => builder.append_null(), + } + } + } + } + other => { + return exec_err!( + "json_get_str first argument must be a string type, got {other:?}" + ); + } + } + + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + } + ColumnarValue::Scalar(scalar) => { + let json_str = match scalar { + ScalarValue::Utf8(Some(s)) + | ScalarValue::LargeUtf8(Some(s)) + | ScalarValue::Utf8View(Some(s)) => s, + _ => return Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None))), + }; + + let result = extract_str_at_path(json_str, &path_keys); + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(result))) + } + } + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} + +/// Represents a path element for navigating JSON. +#[derive(Debug, Clone)] +enum PathKey { + Key(String), + Index(usize), + Null, +} + +/// Navigate a JSON string using the given path and extract a string value. +/// +/// Returns `None` if: +/// - The JSON is invalid +/// - The path does not exist +/// - The value at the path is not a JSON string +fn extract_str_at_path(json_str: &str, path: &[PathKey]) -> Option { + let mut value: serde_json::Value = serde_json::from_str(json_str).ok()?; + + for key in path { + value = match key { + PathKey::Key(k) => value.get(k)?.clone(), + PathKey::Index(i) => value.get(*i)?.clone(), + PathKey::Null => return None, + }; + } + + match value { + serde_json::Value::String(s) => Some(s), + _ => None, + } +} + +/// Return a [`ScalarUDF`](datafusion_expr::ScalarUDF) implementation of `json_get_str` +pub fn json_get_str_udf() -> Arc { + static INSTANCE: std::sync::LazyLock> = + std::sync::LazyLock::new(|| { + Arc::new(datafusion_expr::ScalarUDF::new_from_impl(JsonGetStr::new())) + }); + Arc::clone(&INSTANCE) +} + +/// Create an [`Expr`](datafusion_expr::Expr) that calls `json_get_str` +pub fn json_get_str(args: Vec) -> datafusion_expr::Expr { + datafusion_expr::Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction::new_udf( + json_get_str_udf(), + args, + )) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::StringArray; + use arrow::datatypes::Field; + use datafusion_common::ScalarValue; + use datafusion_common::config::ConfigOptions; + use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; + + fn invoke_json_get_str( + json_values: ColumnarValue, + keys: Vec, + num_rows: usize, + ) -> Result { + let udf = JsonGetStr::new(); + let mut args = vec![json_values]; + args.extend(keys); + let arg_fields: Vec<_> = args + .iter() + .map(|a| Field::new("a", a.data_type(), true).into()) + .collect(); + + udf.invoke_with_args(ScalarFunctionArgs { + args, + arg_fields, + number_rows: num_rows, + return_field: Field::new("f", DataType::Utf8, true).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + } + + #[test] + fn test_simple_object_key() -> Result<()> { + let json = ColumnarValue::Scalar(ScalarValue::Utf8(Some( + r#"{"name": "DataFusion"}"#.to_string(), + ))); + let key = ColumnarValue::Scalar(ScalarValue::Utf8(Some("name".to_string()))); + + let result = invoke_json_get_str(json, vec![key], 1)?; + match result { + ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => { + assert_eq!(s, "DataFusion"); + } + other => panic!("expected Utf8 scalar, got {other:?}"), + } + Ok(()) + } + + #[test] + fn test_nested_path() -> Result<()> { + let json = ColumnarValue::Scalar(ScalarValue::Utf8(Some( + r#"{"a": {"b": {"c": "deep"}}}"#.to_string(), + ))); + let keys = vec![ + ColumnarValue::Scalar(ScalarValue::Utf8(Some("a".to_string()))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("b".to_string()))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some("c".to_string()))), + ]; + + let result = invoke_json_get_str(json, keys, 1)?; + match result { + ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => { + assert_eq!(s, "deep"); + } + other => panic!("expected Utf8 scalar, got {other:?}"), + } + Ok(()) + } + + #[test] + fn test_array_index() -> Result<()> { + let json = ColumnarValue::Scalar(ScalarValue::Utf8(Some( + r#"{"items": ["zero", "one", "two"]}"#.to_string(), + ))); + let keys = vec![ + ColumnarValue::Scalar(ScalarValue::Utf8(Some("items".to_string()))), + ColumnarValue::Scalar(ScalarValue::Int64(Some(1))), + ]; + + let result = invoke_json_get_str(json, keys, 1)?; + match result { + ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => { + assert_eq!(s, "one"); + } + other => panic!("expected Utf8 scalar, got {other:?}"), + } + Ok(()) + } + + #[test] + fn test_missing_key_returns_null() -> Result<()> { + let json = ColumnarValue::Scalar(ScalarValue::Utf8(Some( + r#"{"a": "hello"}"#.to_string(), + ))); + let key = + ColumnarValue::Scalar(ScalarValue::Utf8(Some("nonexistent".to_string()))); + + let result = invoke_json_get_str(json, vec![key], 1)?; + match result { + ColumnarValue::Scalar(ScalarValue::Utf8(None)) => {} + other => panic!("expected null Utf8 scalar, got {other:?}"), + } + Ok(()) + } + + #[test] + fn test_non_string_value_returns_null() -> Result<()> { + let json = ColumnarValue::Scalar(ScalarValue::Utf8(Some( + r#"{"count": 42}"#.to_string(), + ))); + let key = ColumnarValue::Scalar(ScalarValue::Utf8(Some("count".to_string()))); + + let result = invoke_json_get_str(json, vec![key], 1)?; + match result { + ColumnarValue::Scalar(ScalarValue::Utf8(None)) => {} + other => panic!("expected null Utf8 scalar (non-string JSON value), got {other:?}"), + } + Ok(()) + } + + #[test] + fn test_invalid_json_returns_null() -> Result<()> { + let json = ColumnarValue::Scalar(ScalarValue::Utf8(Some( + "not valid json".to_string(), + ))); + let key = ColumnarValue::Scalar(ScalarValue::Utf8(Some("a".to_string()))); + + let result = invoke_json_get_str(json, vec![key], 1)?; + match result { + ColumnarValue::Scalar(ScalarValue::Utf8(None)) => {} + other => panic!("expected null Utf8 scalar (invalid json), got {other:?}"), + } + Ok(()) + } + + #[test] + fn test_null_json_returns_null() -> Result<()> { + let json = ColumnarValue::Scalar(ScalarValue::Utf8(None)); + let key = ColumnarValue::Scalar(ScalarValue::Utf8(Some("a".to_string()))); + + let result = invoke_json_get_str(json, vec![key], 1)?; + match result { + ColumnarValue::Scalar(ScalarValue::Utf8(None)) => {} + other => panic!("expected null Utf8 scalar, got {other:?}"), + } + Ok(()) + } + + #[test] + fn test_array_input() -> Result<()> { + let json_array = ColumnarValue::Array(Arc::new(StringArray::from(vec![ + Some(r#"{"name": "Alice"}"#), + Some(r#"{"name": "Bob"}"#), + None, + Some(r#"{"other": "value"}"#), + ]))); + let key = ColumnarValue::Scalar(ScalarValue::Utf8(Some("name".to_string()))); + + let result = invoke_json_get_str(json_array, vec![key], 4)?; + match result { + ColumnarValue::Array(arr) => { + let string_arr = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(string_arr.len(), 4); + assert_eq!(string_arr.value(0), "Alice"); + assert_eq!(string_arr.value(1), "Bob"); + assert!(string_arr.is_null(2)); + assert!(string_arr.is_null(3)); // key "name" not found + } + other => panic!("expected array result, got {other:?}"), + } + Ok(()) + } +} diff --git a/datafusion/functions-json/src/lib.rs b/datafusion/functions-json/src/lib.rs new file mode 100644 index 0000000000000..25d90ba082ce5 --- /dev/null +++ b/datafusion/functions-json/src/lib.rs @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#![doc( + html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg", + html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg" +)] +#![cfg_attr(docsrs, feature(doc_cfg))] +// Make sure fast / cheap clones on Arc are explicit: +// https://github.com/apache/datafusion/issues/11143 +#![deny(clippy::clone_on_ref_ptr)] +#![cfg_attr(test, allow(clippy::needless_pass_by_value))] + +//! JSON Functions for [DataFusion]. +//! +//! This crate contains JSON manipulation functions implemented using the +//! extension API. These functions operate on JSON-encoded strings and provide +//! path-based extraction of values. +//! +//! [DataFusion]: https://crates.io/crates/datafusion +//! +//! # Available Functions +//! +//! | Function | Description | +//! |----------|-------------| +//! | [`json_get_str`] | Extract a string value from a JSON string at the given path | +//! +//! # Usage +//! +//! You can register all functions using the [`register_all`] function. +//! +//! ``` +//! # fn main() -> datafusion_common::Result<()> { +//! # let mut registry = datafusion_execution::registry::MemoryFunctionRegistry::new(); +//! # use datafusion_execution::FunctionRegistry; +//! use datafusion_functions_json; +//! datafusion_functions_json::register_all(&mut registry)?; +//! # Ok(()) +//! # } +//! ``` + +pub mod json_get_str; + +use datafusion_common::Result; +use datafusion_execution::FunctionRegistry; +use datafusion_expr::ScalarUDF; +use log::debug; +use std::sync::Arc; + +/// Fluent-style API for creating `Expr`s +pub mod expr_fn { + pub use super::json_get_str::json_get_str; +} + +/// Return all default JSON functions +pub fn all_default_json_functions() -> Vec> { + vec![json_get_str::json_get_str_udf()] +} + +/// Registers all JSON functions with a [`FunctionRegistry`] +pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> { + let functions = all_default_json_functions(); + functions.into_iter().try_for_each(|udf| { + let existing_udf = registry.register_udf(udf)?; + if let Some(existing_udf) = existing_udf { + debug!("Overwrite existing UDF: {}", existing_udf.name()); + } + Ok(()) as Result<()> + })?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use crate::all_default_json_functions; + use datafusion_common::Result; + use std::collections::HashSet; + + #[test] + fn test_no_duplicate_name() -> Result<()> { + let mut names = HashSet::new(); + for func in all_default_json_functions() { + assert!( + names.insert(func.name().to_string().to_lowercase()), + "duplicate function name: {}", + func.name() + ); + for alias in func.aliases() { + assert!( + names.insert(alias.to_string().to_lowercase()), + "duplicate function name: {alias}" + ); + } + } + Ok(()) + } +} diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index d7bb2583c9d8c..aa2a8a777abc6 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -46,7 +46,7 @@ bigdecimal = { workspace = true } bytes = { workspace = true, optional = true } chrono = { workspace = true, optional = true } clap = { version = "4.5.60", features = ["derive", "env"] } -datafusion = { workspace = true, default-features = true, features = ["avro"] } +datafusion = { workspace = true, default-features = true, features = ["avro", "json_expressions"] } datafusion-spark = { workspace = true, features = ["core"] } datafusion-substrait = { workspace = true, default-features = true, optional = true } futures = { workspace = true } diff --git a/datafusion/sqllogictest/test_files/json_functions.slt b/datafusion/sqllogictest/test_files/json_functions.slt new file mode 100644 index 0000000000000..6421065842bd8 --- /dev/null +++ b/datafusion/sqllogictest/test_files/json_functions.slt @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +########## +## JSON Function Tests +########## + +# Setup test table +statement ok +CREATE TABLE json_test AS VALUES + ('{"name": "Alice", "age": 30}'), + ('{"name": "Bob", "nested": {"key": "val"}}'); + +# Simple key extraction +query T +SELECT json_get_str(column1, 'name') FROM json_test; +---- +Alice +Bob + +# Nested key extraction — first row has no "nested" key, returns NULL +query T +SELECT json_get_str(column1, 'nested', 'key') FROM json_test; +---- +NULL +val + +# Non-string value returns NULL +query T +SELECT json_get_str('{"count": 42}', 'count'); +---- +NULL + +# Missing key returns NULL +query T +SELECT json_get_str('{"a": "hello"}', 'nonexistent'); +---- +NULL + +# Invalid JSON returns NULL +query T +SELECT json_get_str('not valid json', 'key'); +---- +NULL + +# Null JSON input returns NULL +query T +SELECT json_get_str(NULL, 'key'); +---- +NULL + +# Too few arguments — should fail at planning time +statement error +SELECT json_get_str('{"a": 1}'); + +# Clean up +statement ok +DROP TABLE json_test;