Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.parser.extensions

import org.apache.paimon.spark.catalog.SupportV1Function
import org.apache.paimon.spark.catalog.functions.SQLFunctionConverter
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.FunctionIdentifier

/**
* Simplified version of CreatePaimonSQLFunctionCommand for Spark 4.0. Persists the function without
* full body analysis (no type inference, no validation). The full version in spark4-common is used
* by Spark 4.1+.
*/
case class CreatePaimonSQLFunctionCommand(
catalog: SupportV1Function,
name: FunctionIdentifier,
inputParamText: Option[String],
returnTypeText: String,
exprText: Option[String],
queryText: Option[String],
comment: Option[String],
isDeterministic: Option[Boolean],
containsSQL: Option[Boolean],
isTableFunc: Boolean,
ignoreIfExists: Boolean,
replace: Boolean)
extends PaimonLeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
require(
returnTypeText != null && returnTypeText.trim.nonEmpty,
s"SQL function $name requires an explicit RETURNS clause on Spark 4.0.")

val parser = sparkSession.sessionState.sqlParser
val paimonFunction = SQLFunctionConverter.toPaimonFunction(
name,
inputParamText,
returnTypeText,
exprText,
queryText,
comment,
isDeterministic,
containsSQL,
parser)

if (replace) {
catalog.dropV1Function(name, true)
}
catalog.createV1Function(paimonFunction, ignoreIfExists)
Nil
}

override def simpleString(maxFields: Int): String = {
s"CreatePaimonSQLFunctionCommand: $name"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,32 +88,74 @@ case class DescribePaimonV1FunctionCommand(
s"File Resources: ${functionDefinition.fileResources().asScala.map(_.uri()).mkString(", ")}")
}
case sqlFunctionDefinition: FunctionDefinition.SQLFunctionDefinition =>
rows += Row(s"Function: ${function.fullName()}")
rows += Row("Type: SCALAR")
val buffer = new ArrayBuffer[(String, String)]
buffer += ("Function:" -> function.fullName())
buffer += ("Type:" -> "SCALAR")
val inputParams = function.inputParams()
if (inputParams.isPresent && !inputParams.get().isEmpty) {
val params = inputParams
.get()
.asScala
.map(field => s"${field.name()} ${field.`type`().asSQLString()}")
.mkString(", ")
rows += Row(s"Input: $params")
val params = formatInputParams(inputParams.get().asScala)
buffer += ("Input:" -> params.head)
params.tail.foreach(s => buffer += ("" -> s))
} else {
buffer += ("Input:" -> "()")
}
val returnParams = function.returnParams()
if (returnParams.isPresent && !returnParams.get().isEmpty) {
rows += Row(s"Returns: ${returnParams.get().get(0).`type`().asSQLString()}")
buffer += ("Returns:" -> returnParams.get().get(0).`type`().asSQLString())
}
if (isExtended) {
Option(function.comment()).foreach(c => rows += Row(s"Comment: $c"))
rows += Row(s"Body: ${sqlFunctionDefinition.definition()}")
Option(function.comment()).foreach(c => buffer += ("Comment:" -> c))
buffer += ("Deterministic:" -> function.isDeterministic.toString)
val options = function.options()
Option(options.get("spark.sql-function.contains-sql"))
.map(_.toBoolean)
.foreach {
c =>
val dataAccess = if (c) "CONTAINS SQL" else "READS SQL DATA"
buffer += ("Data Access:" -> dataAccess)
}
val configs = options.asScala
.filter(_._1.startsWith("sqlConfig."))
.toSeq
.sortBy(_._1)
.map { case (k, v) => s"${k.stripPrefix("sqlConfig.")}=$v" }
if (configs.nonEmpty) {
buffer += ("Configs:" -> configs.head)
configs.tail.foreach(s => buffer += ("" -> s))
}
buffer += ("Body:" -> sqlFunctionDefinition.definition())
}
val keys = tabulate(buffer.map(_._1).toSeq)
val values = buffer.map(_._2)
keys.zip(values).foreach { case (key, value) => rows += Row(s"$key $value") }
case other =>
throw new UnsupportedOperationException(s"Unsupported function definition $other")
}

rows.toSeq
}

private def tabulate(inputs: Seq[String]): Seq[String] = {
val maxLen = inputs.map(_.length).max
inputs.map(_.padTo(maxLen, ' '))
}

private def formatInputParams(
params: Iterable[org.apache.paimon.types.DataField]): Seq[String] = {
val fields = params.toSeq
val names = tabulate(fields.map(_.name()))
val types = tabulate(fields.map(_.`type`().asSQLString()))
val defaults = fields.map {
f => if (isExtended) Option(f.defaultValue()).map(d => s" DEFAULT $d").getOrElse("") else ""
}
val comments = fields.map {
f => if (isExtended) Option(f.description()).map(c => s" '$c'").getOrElse("") else ""
}
names.zip(types).zip(defaults).zip(comments).map {
case (((name, dataType), default), comment) => s"$name $dataType$default$comment"
}
}

override def simpleString(maxFields: Int): String = {
s"DescribePaimonV1FunctionCommand: ${function.fullName()}"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,30 @@ abstract class PaimonSQLFunctionTestBase extends PaimonSparkTestWithRestCatalogB
sql("CREATE FUNCTION area(width DOUBLE, height DOUBLE) RETURNS DOUBLE RETURN width * height")

val desc = sql("DESCRIBE FUNCTION area").collect().map(_.getString(0))
assert(desc.exists(_.contains("Type: SCALAR")), desc.mkString("\n"))
assert(desc.exists(_.contains("Input:")), desc.mkString("\n"))
assert(desc.exists(_.contains("SCALAR")), desc.mkString("\n"))
assert(desc.exists(_.contains("Input")), desc.mkString("\n"))
assert(desc.exists(_.contains("width")), desc.mkString("\n"))
assert(desc.exists(_.contains("Returns: DOUBLE")), desc.mkString("\n"))
assert(desc.exists(_.contains("DOUBLE")), desc.mkString("\n"))

val descExt = sql("DESCRIBE FUNCTION EXTENDED area").collect().map(_.getString(0))
assert(descExt.exists(_.contains("Deterministic")), descExt.mkString("\n"))
assert(descExt.exists(_.contains("width * height")), descExt.mkString("\n"))
}
}

test("Paimon SQL Function: describe function with comment") {
withUserDefinedFunction("inc" -> false) {
sql("CREATE FUNCTION inc(x INT) RETURNS INT COMMENT 'increment by one' RETURN x + 1")

val desc = sql("DESCRIBE FUNCTION inc").collect().map(_.getString(0))
assert(desc.exists(_.contains("SCALAR")), desc.mkString("\n"))

val descExt = sql("DESCRIBE FUNCTION EXTENDED inc").collect().map(_.getString(0))
assert(descExt.exists(_.contains("increment by one")), descExt.mkString("\n"))
assert(descExt.exists(_.contains("x + 1")), descExt.mkString("\n"))
}
}

test("Paimon SQL Function: show functions lists the created function") {
withUserDefinedFunction("area" -> false) {
sql("CREATE FUNCTION area(w DOUBLE, h DOUBLE) RETURNS DOUBLE RETURN w * h")
Expand All @@ -132,6 +146,79 @@ abstract class PaimonSQLFunctionTestBase extends PaimonSparkTestWithRestCatalogB
}
}

test("Paimon SQL Function: SQL configs captured at creation time") {
assume(gteqSpark4_1)
withUserDefinedFunction("div_func" -> false) {
// Create with ANSI enabled — division by zero should throw at query time.
sql("SET spark.sql.ansi.enabled=true")
sql("CREATE FUNCTION div_func(x INT) RETURNS DOUBLE RETURN 1 / x")
sql("SET spark.sql.ansi.enabled=false")

// Even though ANSI is now disabled in the session, the function was created with ANSI=true,
// so division by zero should still throw ArithmeticException.
val e = intercept[Exception] {
sql("SELECT div_func(0)").collect()
}
assert(
e.getMessage.contains("Division by zero") ||
e.getMessage.contains("ArithmeticException") ||
e.getMessage.contains("DIVIDE_BY_ZERO"))

sql("RESET spark.sql.ansi.enabled")
}
}

test("Paimon SQL Function: non-deterministic function body") {
assume(gteqSpark4_1)
withUserDefinedFunction("rnd" -> false) {
sql("CREATE FUNCTION rnd() RETURNS DOUBLE RETURN rand()")
val r1 = sql("SELECT rnd()").collect()(0).getDouble(0)
val r2 = sql("SELECT rnd()").collect()(0).getDouble(0)
assert(r1 >= 0.0 && r1 < 1.0)
assert(r2 >= 0.0 && r2 < 1.0)
}
}

test("Paimon SQL Function: reject aggregate in scalar function body") {
assume(gteqSpark4_1)
val e = intercept[Exception] {
sql("CREATE FUNCTION bad_agg(x INT) RETURNS INT RETURN SUM(x)")
}
assert(e.getMessage.contains("CANNOT_CONTAIN_COMPLEX_FUNCTIONS"))
}

test("Paimon SQL Function: reject window function in scalar function body") {
assume(gteqSpark4_1)
val e = intercept[Exception] {
sql("CREATE FUNCTION bad_win(x INT) RETURNS INT RETURN ROW_NUMBER() OVER (ORDER BY x)")
}
assert(e.getMessage.contains("CANNOT_CONTAIN_COMPLEX_FUNCTIONS"))
}

test("Paimon SQL Function: reject duplicate parameter names") {
assume(gteqSpark4_1)
val e = intercept[Exception] {
sql("CREATE FUNCTION bad_dup(x INT, x INT) RETURNS INT RETURN x + x")
}
assert(e.getMessage.toLowerCase.contains("duplicate"))
}

test("Paimon SQL Function: reject non-trailing defaults") {
assume(gteqSpark4_1)
val e = intercept[Exception] {
sql("CREATE FUNCTION bad_def(x INT DEFAULT 1, y INT) RETURNS INT RETURN x + y")
}
assert(e.getMessage.toLowerCase.contains("default"))
}

test("Paimon SQL Function: omitting RETURNS clause") {
assume(gteqSpark4_1)
withUserDefinedFunction("inc" -> false) {
sql("CREATE FUNCTION inc(x INT) RETURN x + 1")
checkAnswer(sql("SELECT inc(10)"), Row(11))
}
}

test("Paimon SQL Function: table function is not supported yet") {
val e = intercept[Exception] {
sql("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ import org.apache.spark.sql.types.{DataType => SparkDataType, StructType}

import java.util.{Collections, HashMap => JHashMap, List => JList}

import scala.collection.JavaConverters._

/** Converts between Spark SQLFunction and Paimon Function with a SQLFunctionDefinition body. */
object SQLFunctionConverter {

// Spark-specific metadata stored in Paimon Function.options().
private val IS_QUERY = "spark.sql-function.is-query"
private val DETERMINISTIC = "spark.sql-function.deterministic"
private val CONTAINS_SQL = "spark.sql-function.contains-sql"
// Paimon-specific option keys (prefixed to avoid collision with Spark properties).
private val PAIMON_OPTION_PREFIX = "spark.sql-function."
private val IS_QUERY = PAIMON_OPTION_PREFIX + "is-query"
private val DETERMINISTIC = PAIMON_OPTION_PREFIX + "deterministic"
private val CONTAINS_SQL = PAIMON_OPTION_PREFIX + "contains-sql"

/** Build a Paimon function from a parsed CREATE FUNCTION ... RETURN statement. */
def toPaimonFunction(
Expand All @@ -50,10 +53,11 @@ object SQLFunctionConverter {
comment: Option[String],
isDeterministic: Option[Boolean],
containsSQL: Option[Boolean],
parser: ParserInterface): PaimonFunction = {
parser: ParserInterface,
properties: Map[String, String] = Map.empty): PaimonFunction = {
require(
returnTypeText != null && returnTypeText.trim.nonEmpty,
s"SQL function $funcIdent must declare an explicit RETURNS type.")
s"SQL function $funcIdent must have a return type (explicit or inferred).")
val identifier = FunctionIdentifierConverter.toPaimonIdentifier(funcIdent)

val inputParams: JList[DataField] = inputParamText.filter(_.trim.nonEmpty) match {
Expand All @@ -78,12 +82,13 @@ object SQLFunctionConverter {
options.put(IS_QUERY, isQuery.toString)
isDeterministic.foreach(d => options.put(DETERMINISTIC, d.toString))
containsSQL.foreach(c => options.put(CONTAINS_SQL, c.toString))
properties.foreach { case (k, v) => options.put(k, v) }

new FunctionImpl(
identifier,
inputParams,
returnParams,
isDeterministic.getOrElse(true),
isDeterministic.getOrElse(true), // caller should always pass Some after analysis
Collections.singletonMap(FUNCTION_DEFINITION_NAME, FunctionDefinition.sql(body)),
comment.orNull,
options
Expand Down Expand Up @@ -139,7 +144,7 @@ object SQLFunctionConverter {
deterministic = deterministic,
containsSQL = Option(options.get(CONTAINS_SQL)).map(_.toBoolean),
isTableFunc = false,
properties = Map.empty
properties = options.asScala.filterNot(_._1.startsWith(PAIMON_OPTION_PREFIX)).toMap
)

SQLFunctionExpression(
Expand Down
Loading