From 168ed58c78b08c80067725888fba361134c3214c Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 3 Apr 2026 11:34:18 -0700 Subject: [PATCH 01/14] chor: enable arrays_overlap --- .../source/user-guide/latest/compatibility.md | 3 - docs/source/user-guide/latest/expressions.md | 2 +- .../scala/org/apache/comet/serde/arrays.scala | 8 --- .../expressions/array/arrays_overlap.sql | 4 +- .../comet/CometArrayExpressionSuite.scala | 66 ++++++++++++++----- 5 files changed, 52 insertions(+), 31 deletions(-) diff --git a/docs/source/user-guide/latest/compatibility.md b/docs/source/user-guide/latest/compatibility.md index a6cced4e12..94dab0dba3 100644 --- a/docs/source/user-guide/latest/compatibility.md +++ b/docs/source/user-guide/latest/compatibility.md @@ -64,9 +64,6 @@ the [Comet Supported Expressions Guide](expressions.md) for more information on [#3346](https://github.com/apache/datafusion-comet/issues/3346) - **ArrayRemove**: Returns null when the element to remove is null, instead of removing null elements from the array. [#3173](https://github.com/apache/datafusion-comet/issues/3173) -- **ArraysOverlap**: Inconsistent behavior when arrays contain NULL values. - [#3645](https://github.com/apache/datafusion-comet/issues/3645), - [#2036](https://github.com/apache/datafusion-comet/issues/2036) - **ArrayUnion**: Sorts input arrays before performing the union, while Spark preserves the order of the first array and appends unique elements from the second. [#3644](https://github.com/apache/datafusion-comet/issues/3644) diff --git a/docs/source/user-guide/latest/expressions.md b/docs/source/user-guide/latest/expressions.md index 136e1e454f..6db05f4321 100644 --- a/docs/source/user-guide/latest/expressions.md +++ b/docs/source/user-guide/latest/expressions.md @@ -246,7 +246,7 @@ Comet supports using the following aggregate functions within window contexts wi | ArrayRemove | No | Returns null when element is null instead of removing null elements ([#3173](https://github.com/apache/datafusion-comet/issues/3173)) | | ArrayRepeat | No | | | ArrayUnion | No | Behaves differently than spark. Comet sorts the input arrays before performing the union, while Spark preserves the order of the first array and appends unique elements from the second. | -| ArraysOverlap | No | | +| ArraysOverlap | Yes | | | CreateArray | Yes | | | ElementAt | Yes | Input must be an array. Map inputs are not supported. | | Flatten | Yes | | diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 47a6e91421..2c3aba2d66 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -246,14 +246,6 @@ object CometArrayMin extends CometExpressionSerde[ArrayMin] { } object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { - - override def getSupportLevel(expr: ArraysOverlap): SupportLevel = - Incompatible( - Some( - "Inconsistent behavior with NULL values" + - " (https://github.com/apache/datafusion-comet/issues/3645)" + - " (https://github.com/apache/datafusion-comet/issues/2036)")) - override def convert( expr: ArraysOverlap, inputs: Seq[Attribute], diff --git a/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql b/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql index 27d28a7402..4b83377457 100644 --- a/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql +++ b/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql @@ -24,11 +24,11 @@ CREATE TABLE test_arrays_overlap(a array, b array) USING parquet statement INSERT INTO test_arrays_overlap VALUES (array(1, 2, 3), array(3, 4, 5)), (array(1, 2), array(3, 4)), (array(), array(1)), (NULL, array(1)), (array(1, NULL), array(NULL, 2)) -query ignore(https://github.com/apache/datafusion-comet/issues/3645) +query SELECT arrays_overlap(a, b) FROM test_arrays_overlap -- column + literal -query ignore(https://github.com/apache/datafusion-comet/issues/3645) +query SELECT arrays_overlap(a, array(3, 4, 5)) FROM test_arrays_overlap -- literal + column diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index fb5531a573..95df0b38eb 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -23,7 +23,7 @@ import scala.util.Random import org.apache.hadoop.fs.Path import org.apache.spark.sql.CometTestBase -import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayDistinct, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayRepeat, ArraysOverlap, ArrayUnion} +import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayDistinct, ArrayExcept, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayRepeat, ArrayUnion} import org.apache.spark.sql.catalyst.expressions.{ArrayContains, ArrayRemove} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions._ @@ -545,27 +545,59 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("arrays_overlap") { - withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArraysOverlap]) -> "true") { - Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - withTempView("t1") { - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, 10000) - spark.read.parquet(path.toString).createOrReplaceTempView("t1") - checkSparkAnswerAndOperator(sql( - "SELECT arrays_overlap(array(_2, _3, _4), array(_3, _4)) from t1 where _2 is not null")) - checkSparkAnswerAndOperator(sql( - "SELECT arrays_overlap(array('a', null, cast(_1 as string)), array('b', cast(_1 as string), cast(_2 as string))) from t1 where _1 is not null")) - checkSparkAnswerAndOperator(sql( - "SELECT arrays_overlap(array('a', null), array('b', null)) from t1 where _1 is not null")) - checkSparkAnswerAndOperator(spark.sql( - "SELECT arrays_overlap((CASE WHEN _2 =_3 THEN array(_6, _7) END), array(_6, _7)) FROM t1")); - } + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, 10000) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array(_2, _3, _4), array(_3, _4)) from t1 where _2 is not null")) + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array('a', null, cast(_1 as string)), array('b', cast(_1 as string), cast(_2 as string))) from t1 where _1 is not null")) + checkSparkAnswerAndOperator(sql( + "SELECT arrays_overlap(array('a', null), array('b', null)) from t1 where _1 is not null")) + checkSparkAnswerAndOperator(spark.sql( + "SELECT arrays_overlap((CASE WHEN _2 =_3 THEN array(_6, _7) END), array(_6, _7)) FROM t1")); } } } } + test("arrays_overlap - null handling behavior verification") { + withTempDir { dir => + withTempView("t1") { + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, 100) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + + // Test case 1: Common element exists - should return true + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, 2, 3), array(3, 4, 5)) from t1 limit 1")) + + // Test case 2: No common elements, no nulls - should return false + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, 2), array(3, 4)) from t1 limit 1")) + + // Test case 3: No common elements, but null exists - Spark returns null (three-valued logic) + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, null, 3), array(4, 5)) from t1 limit 1")) + + // Test case 4: Common element exists even with null - should return true + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, null, 3), array(1, 4)) from t1 limit 1")) + + // Test case 5: Both arrays have null but no common non-null elements + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, null), array(2, null)) from t1 limit 1")) + + // Test case 6: Empty arrays + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(), array(1, 2)) from t1 limit 1")) + } + } + } + test("array_compact") { // TODO fix for Spark 4.0.0 assume(!isSpark40Plus) From b617476e4624632e365d2f3e14ec7c95eb2f42a0 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 3 Apr 2026 12:48:47 -0700 Subject: [PATCH 02/14] fix: workaround arrays_overlap --- .../scala/org/apache/comet/serde/arrays.scala | 9 ++-- .../expressions/array/arrays_overlap.sql | 1 - .../comet/CometArrayExpressionSuite.scala | 48 +++++++++---------- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 2c3aba2d66..16d38bd520 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import scala.annotation.tailrec -import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, IsNotNull, Literal, Reverse, Size} +import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, If, IsNotNull, IsNull, Literal, Reverse, Size} import org.apache.spark.sql.catalyst.util.GenericArrayData import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -250,8 +250,11 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { expr: ArraysOverlap, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - val leftArrayExprProto = exprToProto(expr.children.head, inputs, binding) - val rightArrayExprProto = exprToProto(expr.children(1), inputs, binding) + val left = If(IsNull(expr.left), Literal.create(Array(null), expr.left.dataType), expr.left) + val right = + If(IsNull(expr.right), Literal.create(Array(null), expr.right.dataType), expr.right) + val leftArrayExprProto = exprToProto(left, inputs, binding) + val rightArrayExprProto = exprToProto(right, inputs, binding) val arraysOverlapScalarExpr = scalarFunctionExprToProtoWithReturnType( "array_has_any", diff --git a/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql b/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql index 4b83377457..88cf38d211 100644 --- a/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql +++ b/spark/src/test/resources/sql-tests/expressions/array/arrays_overlap.sql @@ -15,7 +15,6 @@ -- specific language governing permissions and limitations -- under the License. --- Config: spark.comet.expression.ArraysOverlap.allowIncompatible=true -- ConfigMatrix: parquet.enable.dictionary=false,true statement diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 95df0b38eb..02be47a16a 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -565,36 +565,36 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("arrays_overlap - null handling behavior verification") { - withTempDir { dir => - withTempView("t1") { - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, 100) - spark.read.parquet(path.toString).createOrReplaceTempView("t1") + withTable("t") { + sql("create table t using parquet as select CAST(NULL as array) a1 from range(1)") - // Test case 1: Common element exists - should return true - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, 2, 3), array(3, 4, 5)) from t1 limit 1")) + // Test case 1: Common element exists - should return true + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, 2, 3), array(3, 4, 5)) from t")) - // Test case 2: No common elements, no nulls - should return false - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, 2), array(3, 4)) from t1 limit 1")) + // Test case 2: No common elements, no nulls - should return false + checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(array(1, 2), array(3, 4)) from t")) - // Test case 3: No common elements, but null exists - Spark returns null (three-valued logic) - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, null, 3), array(4, 5)) from t1 limit 1")) + // Test case 3: No common elements, but null exists - Spark returns null (three-valued logic) + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, null, 3), array(4, 5)) from t")) - // Test case 4: Common element exists even with null - should return true - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, null, 3), array(1, 4)) from t1 limit 1")) + // Test case 4: Common element exists even with null - should return true + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, null, 3), array(1, 4)) from t")) - // Test case 5: Both arrays have null but no common non-null elements - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, null), array(2, null)) from t1 limit 1")) + // Test case 5: Both arrays have null but no common non-null elements + checkSparkAnswerAndOperator( + sql("SELECT arrays_overlap(array(1, null), array(2, null)) from t")) - // Test case 6: Empty arrays - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(), array(1, 2)) from t1 limit 1")) - } + // Test case 6: Empty arrays + checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(array(), array(1, 2)) from t")) + + checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(array(null), array(null)) from t")) + + checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(a1, array(1, 2)) from t")) + + checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(a1, a1) from t")) } } From 6a68c52374960ccaeb6548c848e0d892d2fb1ff6 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 3 Apr 2026 15:04:16 -0700 Subject: [PATCH 03/14] fix: workaround arrays_overlap --- .../scala/org/apache/comet/serde/arrays.scala | 44 ++++++++++++++++--- .../comet/CometArrayExpressionSuite.scala | 44 +++++++------------ 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index 16d38bd520..d9b1369d02 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import scala.annotation.tailrec -import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, If, IsNotNull, IsNull, Literal, Reverse, Size} +import org.apache.spark.sql.catalyst.expressions.{ArrayAppend, ArrayContains, ArrayDistinct, ArrayExcept, ArrayFilter, ArrayInsert, ArrayIntersect, ArrayJoin, ArrayMax, ArrayMin, ArrayRemove, ArrayRepeat, ArraysOverlap, ArrayUnion, Attribute, CreateArray, ElementAt, Expression, Flatten, GetArrayItem, IsNotNull, Literal, Reverse, Size} import org.apache.spark.sql.catalyst.util.GenericArrayData import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -250,11 +250,8 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { expr: ArraysOverlap, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - val left = If(IsNull(expr.left), Literal.create(Array(null), expr.left.dataType), expr.left) - val right = - If(IsNull(expr.right), Literal.create(Array(null), expr.right.dataType), expr.right) - val leftArrayExprProto = exprToProto(left, inputs, binding) - val rightArrayExprProto = exprToProto(right, inputs, binding) + val leftArrayExprProto = exprToProto(expr.left, inputs, binding) + val rightArrayExprProto = exprToProto(expr.right, inputs, binding) val arraysOverlapScalarExpr = scalarFunctionExprToProtoWithReturnType( "array_has_any", @@ -262,7 +259,40 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { false, leftArrayExprProto, rightArrayExprProto) - optExprWithInfo(arraysOverlapScalarExpr, expr, expr.children: _*) + + val leftIsNull = createUnaryExpr( + expr, + expr.left, + inputs, + binding, + (builder, unaryExpr) => builder.setIsNull(unaryExpr)) + val rightIsNull = createUnaryExpr( + expr, + expr.right, + inputs, + binding, + (builder, unaryExpr) => builder.setIsNull(unaryExpr)) + + val nullLiteralProto = exprToProto(Literal(null, BooleanType), inputs) + + if (arraysOverlapScalarExpr.isDefined && leftIsNull.isDefined && rightIsNull.isDefined && nullLiteralProto.isDefined) { + val caseWhenExpr = ExprOuterClass.CaseWhen + .newBuilder() + .addWhen(leftIsNull.get) + .addThen(nullLiteralProto.get) + .addWhen(rightIsNull.get) + .addThen(nullLiteralProto.get) + .setElseExpr(arraysOverlapScalarExpr.get) + .build() + Some( + ExprOuterClass.Expr + .newBuilder() + .setCaseWhen(caseWhenExpr) + .build()) + } else { + withInfo(expr, expr.children: _*) + None + } } } diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 02be47a16a..4e28ffda3c 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -567,34 +567,22 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp test("arrays_overlap - null handling behavior verification") { withTable("t") { sql("create table t using parquet as select CAST(NULL as array) a1 from range(1)") - - // Test case 1: Common element exists - should return true - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, 2, 3), array(3, 4, 5)) from t")) - - // Test case 2: No common elements, no nulls - should return false - checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(array(1, 2), array(3, 4)) from t")) - - // Test case 3: No common elements, but null exists - Spark returns null (three-valued logic) - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, null, 3), array(4, 5)) from t")) - - // Test case 4: Common element exists even with null - should return true - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, null, 3), array(1, 4)) from t")) - - // Test case 5: Both arrays have null but no common non-null elements - checkSparkAnswerAndOperator( - sql("SELECT arrays_overlap(array(1, null), array(2, null)) from t")) - - // Test case 6: Empty arrays - checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(array(), array(1, 2)) from t")) - - checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(array(null), array(null)) from t")) - - checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(a1, array(1, 2)) from t")) - - checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(a1, a1) from t")) + val data = Seq( + "array(1, 2, 3)", + "array(3, 4, 5)", + "array(1, 2)", + "array(3, 4)", + "array(1, null, 3)", + "array(4, 5)", + "array(1, 4)", + "array(1, null)", + "array(2, null)", + "array()", + "array(null)", + "a1") + for (y <- data; x <- data) { + checkSparkAnswerAndOperator(sql(s"SELECT arrays_overlap($y, $x) from t")) + } } } From d297ec4ef02327eaedf0650ea284424d54728472 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 3 Apr 2026 15:46:05 -0700 Subject: [PATCH 04/14] fix: workaround arrays_overlap --- spark/src/main/scala/org/apache/comet/serde/arrays.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index d9b1369d02..0c121e02ca 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -275,7 +275,8 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { val nullLiteralProto = exprToProto(Literal(null, BooleanType), inputs) - if (arraysOverlapScalarExpr.isDefined && leftIsNull.isDefined && rightIsNull.isDefined && nullLiteralProto.isDefined) { + if (arraysOverlapScalarExpr.isDefined && leftIsNull.isDefined && + rightIsNull.isDefined && nullLiteralProto.isDefined) { val caseWhenExpr = ExprOuterClass.CaseWhen .newBuilder() .addWhen(leftIsNull.get) From 8108dd19d424db99f803e6c36c1ddfb51ea038e3 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 8 Apr 2026 14:10:53 -0700 Subject: [PATCH 05/14] fix: workaround arrays_overlap --- .../comet/CometArrayExpressionSuite.scala | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 4e28ffda3c..7e0eece96e 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -565,23 +565,26 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("arrays_overlap - null handling behavior verification") { - withTable("t") { - sql("create table t using parquet as select CAST(NULL as array) a1 from range(1)") - val data = Seq( - "array(1, 2, 3)", - "array(3, 4, 5)", - "array(1, 2)", - "array(3, 4)", - "array(1, null, 3)", - "array(4, 5)", - "array(1, 4)", - "array(1, null)", - "array(2, null)", - "array()", - "array(null)", - "a1") - for (y <- data; x <- data) { - checkSparkAnswerAndOperator(sql(s"SELECT arrays_overlap($y, $x) from t")) + withSQLConf("spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + withTable("t") { + sql("create table t using parquet as select CAST(NULL as array) a1 from range(1)") + val data = Seq( + "array(1, 2, 3)", + "array(3, 4, 5)", + "array(1, 2)", + "array(3, 4)", + "array(1, NULL, 3)", + "array(4, 5)", + "array(1, 4)", + "array(1, NULL)", + "array(2, NULL)", + "array(NULL, 2)", + "array()", + "array(NULL)", + "a1") + for (y <- data; x <- data) { + checkSparkAnswerAndOperator(sql(s"SELECT arrays_overlap($y, $x) from t")) + } } } } From bcf8687f5e86053207eb2321907d466a64537c33 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 8 Apr 2026 17:48:15 -0700 Subject: [PATCH 06/14] test --- .../comet/CometArrayExpressionSuite.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 7e0eece96e..1e5d27bed1 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -579,16 +579,37 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp "array(1, NULL)", "array(2, NULL)", "array(NULL, 2)", + "array(1)", + "array(2)", "array()", "array(NULL)", "a1") for (y <- data; x <- data) { + println(y, x) checkSparkAnswerAndOperator(sql(s"SELECT arrays_overlap($y, $x) from t")) } } } } + test("s") { + withTable("t") { + sql("CREATE TABLE t(a array, b array) USING parquet") + // sql("INSERT INTO t VALUES (array(1, NULL), array(NULL, 2))") // true -> null + // sql("INSERT INTO t VALUES (array(1, NULL), array(2))") // false -> null + // sql("INSERT INTO t VALUES (array(1, NULL), NULL)") // null -> null + // sql("INSERT INTO t VALUES (array(1, NULL), array(1))") // true -> true + // sql("INSERT INTO t VALUES (array(NULL, 1), array(1))") // true -> true + // sql("INSERT INTO t VALUES (array(NULL, 1), array(1, NULL))") // true -> true + // sql("INSERT INTO t VALUES (array(NULL, 1), array())") // false -> false + sql("INSERT INTO t VALUES (array(NULL, 4), array(1, 2, 3))") // false -> false + sql("SELECT arrays_overlap(a, b) FROM t").show() + checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(a, b) FROM t")) + // sql("SELECT arrays_overlap(array(1, NULL), array(NULL, 2)) FROM t").show() + // checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(array(1, NULL), array(NULL, 2)) FROM t")) + } + } + test("array_compact") { // TODO fix for Spark 4.0.0 assume(!isSpark40Plus) From d93db815af4df9590a90f6ab1fb87f5f9103a699 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 8 Apr 2026 19:42:27 -0700 Subject: [PATCH 07/14] fix: workaround arrays_overlap --- .../src/array_funcs/arrays_overlap.rs | 380 ++++++++++++++++++ native/spark-expr/src/array_funcs/mod.rs | 2 + native/spark-expr/src/comet_scalar_funcs.rs | 5 +- .../scala/org/apache/comet/serde/arrays.scala | 2 +- .../comet/CometArrayExpressionSuite.scala | 3 +- 5 files changed, 388 insertions(+), 4 deletions(-) create mode 100644 native/spark-expr/src/array_funcs/arrays_overlap.rs diff --git a/native/spark-expr/src/array_funcs/arrays_overlap.rs b/native/spark-expr/src/array_funcs/arrays_overlap.rs new file mode 100644 index 0000000000..2aef0e46d8 --- /dev/null +++ b/native/spark-expr/src/array_funcs/arrays_overlap.rs @@ -0,0 +1,380 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Spark-compatible `arrays_overlap` with correct SQL three-valued null logic. +//! +//! DataFusion's `array_has_any` uses `RowConverter` for element comparison, which +//! treats NULL == NULL as true (grouping semantics). Spark's `arrays_overlap` uses +//! SQL equality where NULL == NULL is unknown (null). This implementation correctly +//! returns: +//! - true if any non-null element appears in both arrays +//! - null if no definite overlap but either array contains null elements +//! - false if no overlap and neither array contains null elements + +use arrow::array::{Array, ArrayRef, BooleanArray, GenericListArray, OffsetSizeTrait}; +use arrow::compute::kernels::cmp::eq; +use arrow::datatypes::{DataType, FieldRef}; +use datafusion::common::{exec_err, utils::take_function_args, Result, ScalarValue}; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkArraysOverlap { + signature: Signature, +} + +impl Default for SparkArraysOverlap { + fn default() -> Self { + Self::new() + } +} + +impl SparkArraysOverlap { + pub fn new() -> Self { + Self { + signature: Signature::new(TypeSignature::Any(2), Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for SparkArraysOverlap { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "spark_arrays_overlap" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Boolean) + } + + fn return_field_from_args( + &self, + _args: datafusion::logical_expr::ReturnFieldArgs, + ) -> Result { + Ok(Arc::new(arrow::datatypes::Field::new( + self.name(), + DataType::Boolean, + true, + ))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let [left, right] = take_function_args(self.name(), &args.args)?; + + match (left, right) { + (ColumnarValue::Array(left_arr), ColumnarValue::Array(right_arr)) => { + let result = match (left_arr.data_type(), right_arr.data_type()) { + (DataType::List(_), DataType::List(_)) => arrays_overlap_list::( + left_arr.as_any().downcast_ref().unwrap(), + right_arr.as_any().downcast_ref().unwrap(), + )?, + (DataType::LargeList(_), DataType::LargeList(_)) => arrays_overlap_list::( + left_arr.as_any().downcast_ref().unwrap(), + right_arr.as_any().downcast_ref().unwrap(), + )?, + (l, r) => { + return exec_err!( + "spark_arrays_overlap does not support types '{l}' and '{r}'" + ) + } + }; + Ok(ColumnarValue::Array(result)) + } + (left, right) => { + // Handle scalar inputs by converting to arrays + let left_arr = left.to_array(1)?; + let right_arr = right.to_array(1)?; + let result = match (left_arr.data_type(), right_arr.data_type()) { + (DataType::List(_), DataType::List(_)) => arrays_overlap_list::( + left_arr.as_any().downcast_ref().unwrap(), + right_arr.as_any().downcast_ref().unwrap(), + )?, + (DataType::LargeList(_), DataType::LargeList(_)) => arrays_overlap_list::( + left_arr.as_any().downcast_ref().unwrap(), + right_arr.as_any().downcast_ref().unwrap(), + )?, + (l, r) => { + return exec_err!( + "spark_arrays_overlap does not support types '{l}' and '{r}'" + ) + } + }; + let scalar = ScalarValue::try_from_array(&result, 0)?; + Ok(ColumnarValue::Scalar(scalar)) + } + } + } +} + +/// Spark-compatible arrays_overlap with SQL three-valued null logic. +/// +/// For each row, compares elements of two list arrays and returns: +/// - null if either array is null +/// - true if any non-null element appears in both arrays +/// - null if no definite overlap but either array contains null elements +/// - false otherwise +fn arrays_overlap_list( + left: &GenericListArray, + right: &GenericListArray, +) -> Result { + let len = left.len(); + let mut builder = BooleanArray::builder(len); + + for i in 0..len { + if left.is_null(i) || right.is_null(i) { + builder.append_null(); + continue; + } + + let left_values = left.value(i); + let right_values = right.value(i); + + // Empty array cannot overlap + if left_values.is_empty() || right_values.is_empty() { + builder.append_value(false); + continue; + } + + // DataFusion's make_array(NULL) produces a List with NullArray values. + // NullArray means all elements are null by definition. + if left_values.data_type() == &DataType::Null + || right_values.data_type() == &DataType::Null + { + builder.append_null(); + continue; + } + + let mut found_overlap = false; + let mut has_null = false; + + // Compare each element of left against right with null tracking + for li in 0..left_values.len() { + if left_values.is_null(li) { + has_null = true; + continue; + } + let left_scalar = left_values.slice(li, 1); + + for ri in 0..right_values.len() { + if right_values.is_null(ri) { + has_null = true; + continue; + } + let right_scalar = right_values.slice(ri, 1); + let eq_result = eq(&left_scalar, &right_scalar)?; + if eq_result.is_valid(0) && eq_result.value(0) { + found_overlap = true; + break; + } + } + + if found_overlap { + break; + } + } + + // If we haven't iterated right at all (left was all nulls), + // still need to check right for nulls + if !has_null { + for ri in 0..right_values.len() { + if right_values.is_null(ri) { + has_null = true; + break; + } + } + } + + if found_overlap { + builder.append_value(true); + } else if has_null { + builder.append_null(); + } else { + builder.append_value(false); + } + } + + Ok(Arc::new(builder.finish())) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Int32Array, ListArray}; + use arrow::buffer::{NullBuffer, OffsetBuffer}; + use arrow::datatypes::Field; + + fn make_list_array( + values: &Int32Array, + offsets: &[i32], + nulls: Option, + ) -> ListArray { + ListArray::new( + Arc::new(Field::new("item", DataType::Int32, true)), + OffsetBuffer::new(offsets.to_vec().into()), + Arc::new(values.clone()), + nulls, + ) + } + + #[test] + fn test_basic_overlap() -> Result<()> { + // [1, 2, 3] vs [3, 4, 5] => true + let left = make_list_array(&Int32Array::from(vec![1, 2, 3]), &[0, 3], None); + let right = make_list_array(&Int32Array::from(vec![3, 4, 5]), &[0, 3], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result.value(0), true); + assert!(result.is_valid(0)); + Ok(()) + } + + #[test] + fn test_no_overlap() -> Result<()> { + // [1, 2] vs [3, 4] => false + let left = make_list_array(&Int32Array::from(vec![1, 2]), &[0, 2], None); + let right = make_list_array(&Int32Array::from(vec![3, 4]), &[0, 2], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result.value(0), false); + assert!(result.is_valid(0)); + Ok(()) + } + + #[test] + fn test_null_only_overlap() -> Result<()> { + // [1, NULL] vs [NULL, 2] => null (no definite overlap, but nulls present) + let left = make_list_array(&Int32Array::from(vec![Some(1), None]), &[0, 2], None); + let right = make_list_array(&Int32Array::from(vec![None, Some(2)]), &[0, 2], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_null(0)); + Ok(()) + } + + #[test] + fn test_null_with_overlap() -> Result<()> { + // [1, NULL] vs [1, 2] => true (definite overlap on 1) + let left = make_list_array(&Int32Array::from(vec![Some(1), None]), &[0, 2], None); + let right = make_list_array(&Int32Array::from(vec![1, 2]), &[0, 2], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result.value(0), true); + assert!(result.is_valid(0)); + Ok(()) + } + + #[test] + fn test_empty_array() -> Result<()> { + // [1, NULL, 3] vs [] => false + let left = make_list_array( + &Int32Array::from(vec![Some(1), None, Some(3)]), + &[0, 3], + None, + ); + let right = make_list_array(&Int32Array::from(Vec::::new()), &[0, 0], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result.value(0), false); + assert!(result.is_valid(0)); + Ok(()) + } + + #[test] + fn test_null_array() -> Result<()> { + // NULL vs [1, 2] => null + let left = make_list_array( + &Int32Array::from(Vec::::new()), + &[0, 0], + Some(NullBuffer::from(vec![false])), + ); + let right = make_list_array(&Int32Array::from(vec![1, 2]), &[0, 2], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_null(0)); + Ok(()) + } + + #[test] + fn test_both_null_elements() -> Result<()> { + // [NULL] vs [NULL] => null + let left = make_list_array(&Int32Array::from(vec![None::]), &[0, 1], None); + let right = make_list_array(&Int32Array::from(vec![None::]), &[0, 1], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_null(0)); + Ok(()) + } + + #[test] + fn test_both_null_elements_via_null_array() -> Result<()> { + // Simulate what DataFusion's make_array(NULL) produces: List with NullArray values + use arrow::array::NullArray; + + let null_values = Arc::new(NullArray::new(1)) as ArrayRef; + let null_field = Arc::new(Field::new("item", DataType::Null, true)); + let left = ListArray::new( + Arc::clone(&null_field), + OffsetBuffer::new(vec![0, 1].into()), + Arc::clone(&null_values), + None, + ); + let right = ListArray::new( + null_field, + OffsetBuffer::new(vec![0, 1].into()), + null_values, + None, + ); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!( + result.is_null(0), + "Expected null for [NULL] vs [NULL] (NullArray representation), got {:?}", + result + ); + Ok(()) + } + + #[test] + fn test_one_null_element_no_overlap() -> Result<()> { + // [3, NULL] vs [1, 2] => null + let left = make_list_array(&Int32Array::from(vec![Some(3), None]), &[0, 2], None); + let right = make_list_array(&Int32Array::from(vec![1, 2]), &[0, 2], None); + + let result = arrays_overlap_list::(&left, &right)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert!(result.is_null(0)); + Ok(()) + } +} diff --git a/native/spark-expr/src/array_funcs/mod.rs b/native/spark-expr/src/array_funcs/mod.rs index 2bd1b9631b..55e0d4a22e 100644 --- a/native/spark-expr/src/array_funcs/mod.rs +++ b/native/spark-expr/src/array_funcs/mod.rs @@ -17,12 +17,14 @@ mod array_compact; mod array_insert; +mod arrays_overlap; mod get_array_struct_fields; mod list_extract; mod size; pub use array_compact::SparkArrayCompact; pub use array_insert::ArrayInsert; +pub use arrays_overlap::SparkArraysOverlap; pub use get_array_struct_fields::GetArrayStructFields; pub use list_extract::ListExtract; pub use size::{spark_size, SparkSizeFunc}; diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 9c91bb69c9..8911dda1cc 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -23,8 +23,8 @@ use crate::math_funcs::modulo_expr::spark_modulo; use crate::{ spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex, - spark_unscaled_value, EvalMode, SparkArrayCompact, SparkContains, SparkDateDiff, - SparkDateTrunc, SparkMakeDate, SparkSizeFunc, + spark_unscaled_value, EvalMode, SparkArrayCompact, SparkArraysOverlap, SparkContains, + SparkDateDiff, SparkDateTrunc, SparkMakeDate, SparkSizeFunc, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -197,6 +197,7 @@ pub fn create_comet_physical_fun_with_eval_mode( fn all_scalar_functions() -> Vec> { vec![ Arc::new(ScalarUDF::new_from_impl(SparkArrayCompact::default())), + Arc::new(ScalarUDF::new_from_impl(SparkArraysOverlap::default())), Arc::new(ScalarUDF::new_from_impl(SparkContains::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateDiff::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())), diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index c159c2923c..acd0b2168b 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -255,7 +255,7 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { val rightArrayExprProto = exprToProto(expr.right, inputs, binding) val arraysOverlapScalarExpr = scalarFunctionExprToProtoWithReturnType( - "array_has_any", + "spark_arrays_overlap", BooleanType, false, leftArrayExprProto, diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 1e5d27bed1..34c1d1b368 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -565,7 +565,8 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } test("arrays_overlap - null handling behavior verification") { - withSQLConf("spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + withSQLConf( + "spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { withTable("t") { sql("create table t using parquet as select CAST(NULL as array) a1 from range(1)") val data = Seq( From c2a757f4a0e55544a1e7d7c8c9884656c655db51 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 8 Apr 2026 20:15:52 -0700 Subject: [PATCH 08/14] test --- .../comet/CometArrayExpressionSuite.scala | 20 +------------------ 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 34c1d1b368..452dd5d2e0 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -584,33 +584,15 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp "array(2)", "array()", "array(NULL)", + "array(NULL, NULL)", "a1") for (y <- data; x <- data) { - println(y, x) checkSparkAnswerAndOperator(sql(s"SELECT arrays_overlap($y, $x) from t")) } } } } - test("s") { - withTable("t") { - sql("CREATE TABLE t(a array, b array) USING parquet") - // sql("INSERT INTO t VALUES (array(1, NULL), array(NULL, 2))") // true -> null - // sql("INSERT INTO t VALUES (array(1, NULL), array(2))") // false -> null - // sql("INSERT INTO t VALUES (array(1, NULL), NULL)") // null -> null - // sql("INSERT INTO t VALUES (array(1, NULL), array(1))") // true -> true - // sql("INSERT INTO t VALUES (array(NULL, 1), array(1))") // true -> true - // sql("INSERT INTO t VALUES (array(NULL, 1), array(1, NULL))") // true -> true - // sql("INSERT INTO t VALUES (array(NULL, 1), array())") // false -> false - sql("INSERT INTO t VALUES (array(NULL, 4), array(1, 2, 3))") // false -> false - sql("SELECT arrays_overlap(a, b) FROM t").show() - checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(a, b) FROM t")) - // sql("SELECT arrays_overlap(array(1, NULL), array(NULL, 2)) FROM t").show() - // checkSparkAnswerAndOperator(sql("SELECT arrays_overlap(array(1, NULL), array(NULL, 2)) FROM t")) - } - } - test("array_compact") { // TODO fix for Spark 4.0.0 assume(!isSpark40Plus) From a76adf3023e65805e6eba23f039c969fd5e4f118 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 8 Apr 2026 20:21:09 -0700 Subject: [PATCH 09/14] fix: workaround arrays_overlap --- .../src/array_funcs/arrays_overlap.rs | 26 +++++++------- .../scala/org/apache/comet/serde/arrays.scala | 36 +------------------ 2 files changed, 14 insertions(+), 48 deletions(-) diff --git a/native/spark-expr/src/array_funcs/arrays_overlap.rs b/native/spark-expr/src/array_funcs/arrays_overlap.rs index 2aef0e46d8..bf96778b49 100644 --- a/native/spark-expr/src/array_funcs/arrays_overlap.rs +++ b/native/spark-expr/src/array_funcs/arrays_overlap.rs @@ -85,6 +85,18 @@ impl ScalarUDFImpl for SparkArraysOverlap { fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { let [left, right] = take_function_args(self.name(), &args.args)?; + // Return null if either input is a null scalar + if let ColumnarValue::Scalar(s) = &left { + if s.is_null() { + return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None))); + } + } + if let ColumnarValue::Scalar(s) = &right { + if s.is_null() { + return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None))); + } + } + match (left, right) { (ColumnarValue::Array(left_arr), ColumnarValue::Array(right_arr)) => { let result = match (left_arr.data_type(), right_arr.data_type()) { @@ -161,8 +173,7 @@ fn arrays_overlap_list( // DataFusion's make_array(NULL) produces a List with NullArray values. // NullArray means all elements are null by definition. - if left_values.data_type() == &DataType::Null - || right_values.data_type() == &DataType::Null + if left_values.data_type() == &DataType::Null || right_values.data_type() == &DataType::Null { builder.append_null(); continue; @@ -197,17 +208,6 @@ fn arrays_overlap_list( } } - // If we haven't iterated right at all (left was all nulls), - // still need to check right for nulls - if !has_null { - for ri in 0..right_values.len() { - if right_values.is_null(ri) { - has_null = true; - break; - } - } - } - if found_overlap { builder.append_value(true); } else if has_null { diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala b/spark/src/main/scala/org/apache/comet/serde/arrays.scala index acd0b2168b..5c818cae45 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala @@ -260,41 +260,7 @@ object CometArraysOverlap extends CometExpressionSerde[ArraysOverlap] { false, leftArrayExprProto, rightArrayExprProto) - - val leftIsNull = createUnaryExpr( - expr, - expr.left, - inputs, - binding, - (builder, unaryExpr) => builder.setIsNull(unaryExpr)) - val rightIsNull = createUnaryExpr( - expr, - expr.right, - inputs, - binding, - (builder, unaryExpr) => builder.setIsNull(unaryExpr)) - - val nullLiteralProto = exprToProto(Literal(null, BooleanType), inputs) - - if (arraysOverlapScalarExpr.isDefined && leftIsNull.isDefined && - rightIsNull.isDefined && nullLiteralProto.isDefined) { - val caseWhenExpr = ExprOuterClass.CaseWhen - .newBuilder() - .addWhen(leftIsNull.get) - .addThen(nullLiteralProto.get) - .addWhen(rightIsNull.get) - .addThen(nullLiteralProto.get) - .setElseExpr(arraysOverlapScalarExpr.get) - .build() - Some( - ExprOuterClass.Expr - .newBuilder() - .setCaseWhen(caseWhenExpr) - .build()) - } else { - withInfo(expr, expr.children: _*) - None - } + optExprWithInfo(arraysOverlapScalarExpr, expr, expr.children: _*) } } From 8469711485910409ec1d67d506a690f4ef5ec192 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 8 Apr 2026 22:52:27 -0700 Subject: [PATCH 10/14] fix: workaround arrays_overlap --- .../src/array_funcs/arrays_overlap.rs | 40 ++++++++----------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/native/spark-expr/src/array_funcs/arrays_overlap.rs b/native/spark-expr/src/array_funcs/arrays_overlap.rs index bf96778b49..14423fd5a3 100644 --- a/native/spark-expr/src/array_funcs/arrays_overlap.rs +++ b/native/spark-expr/src/array_funcs/arrays_overlap.rs @@ -25,7 +25,7 @@ //! - null if no definite overlap but either array contains null elements //! - false if no overlap and neither array contains null elements -use arrow::array::{Array, ArrayRef, BooleanArray, GenericListArray, OffsetSizeTrait}; +use arrow::array::{Array, ArrayRef, BooleanArray, GenericListArray, OffsetSizeTrait, Scalar}; use arrow::compute::kernels::cmp::eq; use arrow::datatypes::{DataType, FieldRef}; use datafusion::common::{exec_err, utils::take_function_args, Result, ScalarValue}; @@ -180,37 +180,31 @@ fn arrays_overlap_list( } let mut found_overlap = false; - let mut has_null = false; - // Compare each element of left against right with null tracking - for li in 0..left_values.len() { - if left_values.is_null(li) { - has_null = true; + // Ensure smaller array is on the probe side for fewer broadcast eq calls + let (probe, search) = if left_values.len() <= right_values.len() { + (&left_values, &right_values) + } else { + (&right_values, &left_values) + }; + + // For each non-null element in probe, broadcast eq against the full search array. + // One kernel call per probe element: O(p * s) total work but with vectorized inner loop. + for pi in 0..probe.len() { + if probe.is_null(pi) { continue; } - let left_scalar = left_values.slice(li, 1); - - for ri in 0..right_values.len() { - if right_values.is_null(ri) { - has_null = true; - continue; - } - let right_scalar = right_values.slice(ri, 1); - let eq_result = eq(&left_scalar, &right_scalar)?; - if eq_result.is_valid(0) && eq_result.value(0) { - found_overlap = true; - break; - } - } - - if found_overlap { + let scalar = Scalar::new(probe.slice(pi, 1)); + let eq_result = eq(search, &scalar)?; + if eq_result.true_count() > 0 { + found_overlap = true; break; } } if found_overlap { builder.append_value(true); - } else if has_null { + } else if left_values.null_count() > 0 || right_values.null_count() > 0 { builder.append_null(); } else { builder.append_value(false); From bfafd62181b47ad171399eaab2cf2500f7bf33c3 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 8 Apr 2026 23:31:31 -0700 Subject: [PATCH 11/14] fix: workaround arrays_overlap --- .../src/array_funcs/arrays_overlap.rs | 90 ++++++++++--------- 1 file changed, 48 insertions(+), 42 deletions(-) diff --git a/native/spark-expr/src/array_funcs/arrays_overlap.rs b/native/spark-expr/src/array_funcs/arrays_overlap.rs index 14423fd5a3..91ba78720e 100644 --- a/native/spark-expr/src/array_funcs/arrays_overlap.rs +++ b/native/spark-expr/src/array_funcs/arrays_overlap.rs @@ -25,16 +25,18 @@ //! - null if no definite overlap but either array contains null elements //! - false if no overlap and neither array contains null elements -use arrow::array::{Array, ArrayRef, BooleanArray, GenericListArray, OffsetSizeTrait, Scalar}; -use arrow::compute::kernels::cmp::eq; -use arrow::datatypes::{DataType, FieldRef}; +use arrow::array::{Array, ArrayRef, BooleanArray, GenericListArray, OffsetSizeTrait}; +use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion::common::{exec_err, utils::take_function_args, Result, ScalarValue}; +use datafusion::functions_nested::array_has::ArrayHasAny; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; use std::any::Any; use std::sync::Arc; +use super::SparkArrayCompact; + #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkArraysOverlap { signature: Signature, @@ -144,15 +146,48 @@ impl ScalarUDFImpl for SparkArraysOverlap { /// Spark-compatible arrays_overlap with SQL three-valued null logic. /// -/// For each row, compares elements of two list arrays and returns: -/// - null if either array is null -/// - true if any non-null element appears in both arrays -/// - null if no definite overlap but either array contains null elements -/// - false otherwise +/// Strategy: +/// 1. Compact both arrays (remove null elements) using SparkArrayCompact +/// 2. Call DataFusion's array_has_any on the compacted arrays (no null elements = no RowConverter bug) +/// 3. Combine: if array_has_any says true → true; if false and either original had nulls → null; else false fn arrays_overlap_list( left: &GenericListArray, right: &GenericListArray, ) -> Result { + let left_arr: ArrayRef = Arc::new(left.clone()); + let right_arr: ArrayRef = Arc::new(right.clone()); + + // Step 1: Compact both arrays to remove null elements + let compact = SparkArrayCompact::new(); + let compacted_left = compact.invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::clone(&left_arr))], + number_rows: left.len(), + return_field: Arc::new(Field::new("", left_arr.data_type().clone(), true)), + arg_fields: vec![], + config_options: Arc::new(datafusion::common::config::ConfigOptions::default()), + })?; + let compacted_right = compact.invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::clone(&right_arr))], + number_rows: right.len(), + return_field: Arc::new(Field::new("", right_arr.data_type().clone(), true)), + arg_fields: vec![], + config_options: Arc::new(datafusion::common::config::ConfigOptions::default()), + })?; + + // Step 2: Call DataFusion's array_has_any on compacted (null-free) arrays. + // No RowConverter NULL=NULL bug since null elements have been removed. + let array_has_any = ArrayHasAny::new(); + let has_any_result = array_has_any.invoke_with_args(ScalarFunctionArgs { + args: vec![compacted_left, compacted_right], + number_rows: left.len(), + return_field: Arc::new(Field::new("", DataType::Boolean, true)), + arg_fields: vec![], + config_options: Arc::new(datafusion::common::config::ConfigOptions::default()), + })?; + let has_any_arr = has_any_result.into_array(left.len())?; + let has_any = has_any_arr.as_any().downcast_ref::().unwrap(); + + // Step 3: Combine array_has_any result with null logic let len = left.len(); let mut builder = BooleanArray::builder(len); @@ -165,44 +200,15 @@ fn arrays_overlap_list( let left_values = left.value(i); let right_values = right.value(i); - // Empty array cannot overlap if left_values.is_empty() || right_values.is_empty() { + // Empty array cannot overlap builder.append_value(false); - continue; - } - - // DataFusion's make_array(NULL) produces a List with NullArray values. - // NullArray means all elements are null by definition. - if left_values.data_type() == &DataType::Null || right_values.data_type() == &DataType::Null + } else if left_values.data_type() == &DataType::Null + || right_values.data_type() == &DataType::Null { + // DataFusion's make_array(NULL) produces List with NullArray values builder.append_null(); - continue; - } - - let mut found_overlap = false; - - // Ensure smaller array is on the probe side for fewer broadcast eq calls - let (probe, search) = if left_values.len() <= right_values.len() { - (&left_values, &right_values) - } else { - (&right_values, &left_values) - }; - - // For each non-null element in probe, broadcast eq against the full search array. - // One kernel call per probe element: O(p * s) total work but with vectorized inner loop. - for pi in 0..probe.len() { - if probe.is_null(pi) { - continue; - } - let scalar = Scalar::new(probe.slice(pi, 1)); - let eq_result = eq(search, &scalar)?; - if eq_result.true_count() > 0 { - found_overlap = true; - break; - } - } - - if found_overlap { + } else if has_any.is_valid(i) && has_any.value(i) { builder.append_value(true); } else if left_values.null_count() > 0 || right_values.null_count() > 0 { builder.append_null(); From 476083401b2d01666f5c66e51219b4176cb24c9c Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 8 Apr 2026 23:31:44 -0700 Subject: [PATCH 12/14] Revert "fix: workaround arrays_overlap" This reverts commit bfafd62181b47ad171399eaab2cf2500f7bf33c3. --- .../src/array_funcs/arrays_overlap.rs | 90 +++++++++---------- 1 file changed, 42 insertions(+), 48 deletions(-) diff --git a/native/spark-expr/src/array_funcs/arrays_overlap.rs b/native/spark-expr/src/array_funcs/arrays_overlap.rs index 91ba78720e..14423fd5a3 100644 --- a/native/spark-expr/src/array_funcs/arrays_overlap.rs +++ b/native/spark-expr/src/array_funcs/arrays_overlap.rs @@ -25,18 +25,16 @@ //! - null if no definite overlap but either array contains null elements //! - false if no overlap and neither array contains null elements -use arrow::array::{Array, ArrayRef, BooleanArray, GenericListArray, OffsetSizeTrait}; -use arrow::datatypes::{DataType, Field, FieldRef}; +use arrow::array::{Array, ArrayRef, BooleanArray, GenericListArray, OffsetSizeTrait, Scalar}; +use arrow::compute::kernels::cmp::eq; +use arrow::datatypes::{DataType, FieldRef}; use datafusion::common::{exec_err, utils::take_function_args, Result, ScalarValue}; -use datafusion::functions_nested::array_has::ArrayHasAny; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; use std::any::Any; use std::sync::Arc; -use super::SparkArrayCompact; - #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkArraysOverlap { signature: Signature, @@ -146,48 +144,15 @@ impl ScalarUDFImpl for SparkArraysOverlap { /// Spark-compatible arrays_overlap with SQL three-valued null logic. /// -/// Strategy: -/// 1. Compact both arrays (remove null elements) using SparkArrayCompact -/// 2. Call DataFusion's array_has_any on the compacted arrays (no null elements = no RowConverter bug) -/// 3. Combine: if array_has_any says true → true; if false and either original had nulls → null; else false +/// For each row, compares elements of two list arrays and returns: +/// - null if either array is null +/// - true if any non-null element appears in both arrays +/// - null if no definite overlap but either array contains null elements +/// - false otherwise fn arrays_overlap_list( left: &GenericListArray, right: &GenericListArray, ) -> Result { - let left_arr: ArrayRef = Arc::new(left.clone()); - let right_arr: ArrayRef = Arc::new(right.clone()); - - // Step 1: Compact both arrays to remove null elements - let compact = SparkArrayCompact::new(); - let compacted_left = compact.invoke_with_args(ScalarFunctionArgs { - args: vec![ColumnarValue::Array(Arc::clone(&left_arr))], - number_rows: left.len(), - return_field: Arc::new(Field::new("", left_arr.data_type().clone(), true)), - arg_fields: vec![], - config_options: Arc::new(datafusion::common::config::ConfigOptions::default()), - })?; - let compacted_right = compact.invoke_with_args(ScalarFunctionArgs { - args: vec![ColumnarValue::Array(Arc::clone(&right_arr))], - number_rows: right.len(), - return_field: Arc::new(Field::new("", right_arr.data_type().clone(), true)), - arg_fields: vec![], - config_options: Arc::new(datafusion::common::config::ConfigOptions::default()), - })?; - - // Step 2: Call DataFusion's array_has_any on compacted (null-free) arrays. - // No RowConverter NULL=NULL bug since null elements have been removed. - let array_has_any = ArrayHasAny::new(); - let has_any_result = array_has_any.invoke_with_args(ScalarFunctionArgs { - args: vec![compacted_left, compacted_right], - number_rows: left.len(), - return_field: Arc::new(Field::new("", DataType::Boolean, true)), - arg_fields: vec![], - config_options: Arc::new(datafusion::common::config::ConfigOptions::default()), - })?; - let has_any_arr = has_any_result.into_array(left.len())?; - let has_any = has_any_arr.as_any().downcast_ref::().unwrap(); - - // Step 3: Combine array_has_any result with null logic let len = left.len(); let mut builder = BooleanArray::builder(len); @@ -200,15 +165,44 @@ fn arrays_overlap_list( let left_values = left.value(i); let right_values = right.value(i); + // Empty array cannot overlap if left_values.is_empty() || right_values.is_empty() { - // Empty array cannot overlap builder.append_value(false); - } else if left_values.data_type() == &DataType::Null - || right_values.data_type() == &DataType::Null + continue; + } + + // DataFusion's make_array(NULL) produces a List with NullArray values. + // NullArray means all elements are null by definition. + if left_values.data_type() == &DataType::Null || right_values.data_type() == &DataType::Null { - // DataFusion's make_array(NULL) produces List with NullArray values builder.append_null(); - } else if has_any.is_valid(i) && has_any.value(i) { + continue; + } + + let mut found_overlap = false; + + // Ensure smaller array is on the probe side for fewer broadcast eq calls + let (probe, search) = if left_values.len() <= right_values.len() { + (&left_values, &right_values) + } else { + (&right_values, &left_values) + }; + + // For each non-null element in probe, broadcast eq against the full search array. + // One kernel call per probe element: O(p * s) total work but with vectorized inner loop. + for pi in 0..probe.len() { + if probe.is_null(pi) { + continue; + } + let scalar = Scalar::new(probe.slice(pi, 1)); + let eq_result = eq(search, &scalar)?; + if eq_result.true_count() > 0 { + found_overlap = true; + break; + } + } + + if found_overlap { builder.append_value(true); } else if left_values.null_count() > 0 || right_values.null_count() > 0 { builder.append_null(); From f06db49067e90b8a615b29e615fd267bd9f08757 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Thu, 9 Apr 2026 18:14:50 -0700 Subject: [PATCH 13/14] fix: workaround arrays_overlap --- native/spark-expr/src/array_funcs/arrays_overlap.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/native/spark-expr/src/array_funcs/arrays_overlap.rs b/native/spark-expr/src/array_funcs/arrays_overlap.rs index 14423fd5a3..094e25c365 100644 --- a/native/spark-expr/src/array_funcs/arrays_overlap.rs +++ b/native/spark-expr/src/array_funcs/arrays_overlap.rs @@ -242,7 +242,7 @@ mod tests { let result = arrays_overlap_list::(&left, &right)?; let result = result.as_any().downcast_ref::().unwrap(); - assert_eq!(result.value(0), true); + assert!(result.value(0)); assert!(result.is_valid(0)); Ok(()) } @@ -255,7 +255,7 @@ mod tests { let result = arrays_overlap_list::(&left, &right)?; let result = result.as_any().downcast_ref::().unwrap(); - assert_eq!(result.value(0), false); + assert!(!result.value(0)); assert!(result.is_valid(0)); Ok(()) } @@ -280,7 +280,7 @@ mod tests { let result = arrays_overlap_list::(&left, &right)?; let result = result.as_any().downcast_ref::().unwrap(); - assert_eq!(result.value(0), true); + assert!(result.value(0)); assert!(result.is_valid(0)); Ok(()) } @@ -297,7 +297,7 @@ mod tests { let result = arrays_overlap_list::(&left, &right)?; let result = result.as_any().downcast_ref::().unwrap(); - assert_eq!(result.value(0), false); + assert!(!result.value(0)); assert!(result.is_valid(0)); Ok(()) } From 32e356bbbeee4a559cd9c71c794bcca8b806b8eb Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Thu, 9 Apr 2026 18:28:51 -0700 Subject: [PATCH 14/14] fix: workaround arrays_overlap --- .../comet/CometArrayExpressionSuite.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index e86ccb782c..005df7ec56 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -593,6 +593,31 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } } + test("arrays_overlap - nested array null handling behavior verification") { + withSQLConf( + "spark.sql.optimizer.excludedRules" -> "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + withTable("t") { + sql( + "create table t using parquet as select CAST(NULL as array>) a1 from range(1)") + val data = Seq( + "array(array(1, 2), array(3, 4))", + "array(array(1, 2), array(5, 6))", + "array(array(1, 2))", + "array(array(3, 4))", + "array(array(1, NULL))", + "array(array(NULL, 2))", + "array(array(NULL))", + "array(CAST(NULL as array))", + "array(array(1, 2), CAST(NULL as array))", + "array()", + "a1") + for (y <- data; x <- data) { + checkSparkAnswerAndOperator(sql(s"SELECT arrays_overlap($y, $x) from t")) + } + } + } + } + test("array_compact") { // TODO fix for Spark 4.0.0 assume(!isSpark40Plus)