diff --git a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java index 87cecdc65d..e1502ff076 100644 --- a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java +++ b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java @@ -130,7 +130,8 @@ public static void checkParquetType(ColumnDescriptor descriptor, DataType sparkT PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName(); LogicalTypeAnnotation logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); - boolean allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get(); + boolean allowTypePromotion = + isSpark40Plus() || (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get(); if (sparkType instanceof NullType) { return; diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 4312bb36cc..e48a696d38 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -2147,9 +2147,7 @@ index 8670d95c65e..c7ba51f770f 100644 } } -- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { -+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { val data = (1 to 4).map(i => Tuple1(i.toString)) val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) @@ -2157,9 +2155,7 @@ index 8670d95c65e..c7ba51f770f 100644 } } -- test("SPARK-35640: int as long should throw schema incompatible error") { -+ test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + test("SPARK-35640: int as long should throw schema incompatible error") { val data = (1 to 4).map(i => Tuple1(i)) val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) @@ -2189,9 +2185,7 @@ index 29cb224c878..ee5a87fa200 100644 } } -- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { -+ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { val data = (1 to 1000).map { i => val ts = new java.sql.Timestamp(i) Row(ts) @@ -2209,9 +2203,7 @@ index 29cb224c878..ee5a87fa200 100644 testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } -- test("SPARK-34212 Parquet should read decimals correctly") { -+ test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + test("SPARK-34212 Parquet should read decimals correctly") { def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } @@ -2239,9 +2231,7 @@ index 29cb224c878..ee5a87fa200 100644 } } -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + test("row group skipping doesn't overflow when reading into larger type") { withTempPath { path => Seq(0).toDF("a").write.parquet(path.toString) // The vectorized and non-vectorized readers will produce different exceptions, we don't need @@ -2330,14 +2320,14 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index bf5c51b89bb..4e2f0bdb389 100644 +index bf5c51b89bb..44da22178dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException -+import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion} ++import org.apache.spark.sql.IgnoreComet import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc @@ -2351,26 +2341,86 @@ index bf5c51b89bb..4e2f0bdb389 100644 withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false) val expectedMessage = "Encountered error while reading file" -@@ -1026,7 +1028,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { - } - } - -- test("schema mismatch failure error message for parquet vectorized reader") { -+ test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { +@@ -1029,41 +1031,26 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + test("schema mismatch failure error message for parquet vectorized reader") { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) - assert(e.getCause.isInstanceOf[SparkException]) -@@ -1067,7 +1070,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { +- assert(e.getCause.isInstanceOf[SparkException]) +- assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) +- +- // Check if the physical type is reporting correctly +- val errMsg = e.getCause.getMessage +- assert(errMsg.startsWith("Parquet column cannot be converted in file")) +- val file = errMsg.substring("Parquet column cannot be converted in file ".length, +- errMsg.indexOf(". ")) +- val col = spark.read.parquet(file).schema.fields.filter(_.name == "a") +- assert(col.length == 1) +- if (col(0).dataType == StringType) { +- checkError( +- exception = e.getCause.asInstanceOf[SparkException], +- errorClass = "_LEGACY_ERROR_TEMP_2063", +- parameters = Map( +- "filePath" -> +- s".*${dir.getCanonicalPath}.*", +- "column" -> "\\[a\\]", +- "logicalType" -> "int", +- "physicalType" -> "BINARY"), +- matchPVals = true +- ) +- } else { +- checkError( +- exception = e.getCause.asInstanceOf[SparkException], +- errorClass = "_LEGACY_ERROR_TEMP_2063", +- parameters = Map( +- "filePath" -> +- s".*${dir.getCanonicalPath}.*", +- "column" -> "\\[a\\]", +- "logicalType" -> "string", +- "physicalType" -> "INT32"), +- matchPVals = true +- ) ++ // Comet's validation may throw SchemaColumnConvertNotSupportedException directly ++ // (without SparkException wrapper) depending on execution path. ++ var cause: Throwable = e ++ var scnse: SchemaColumnConvertNotSupportedException = null ++ while (cause != null && scnse == null) { ++ cause match { ++ case s: SchemaColumnConvertNotSupportedException => scnse = s ++ case _ => cause = cause.getCause ++ } + } ++ assert(scnse != null, ++ s"Expected SchemaColumnConvertNotSupportedException in cause chain, got: $e") ++ ++ // Check the column name is correct ++ assert(scnse.getColumn.contains("a"), ++ s"Expected column 'a' in exception, got: ${scnse.getColumn}") ++ // Physical type should be either BINARY (string file read as int) ++ // or INT32 (int file read as string) ++ assert(scnse.getPhysicalType == "BINARY" || scnse.getPhysicalType == "INT32", ++ s"Unexpected physical type: ${scnse.getPhysicalType}") } } -- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { -+ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - import testImplicits._ - - withTempPath { dir => +@@ -1082,8 +1069,16 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + val e = intercept[SparkException] { + spark.read.schema(df2.schema).parquet(s"$path/parquet").collect() + } +- assert(e.getCause.isInstanceOf[SparkException]) +- assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) ++ // Comet may throw SchemaColumnConvertNotSupportedException at different ++ // levels in the cause chain depending on execution path. ++ var cause: Throwable = e ++ var found = false ++ while (cause != null && !found) { ++ if (cause.isInstanceOf[SchemaColumnConvertNotSupportedException]) found = true ++ cause = cause.getCause ++ } ++ assert(found, ++ s"Expected SchemaColumnConvertNotSupportedException in cause chain, got: $e") + } + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index 3a0bd35cb70..b28f06a757f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index e349a94f2d..954b831770 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -1420,7 +1420,7 @@ index 5a413c77754..207b66e1d7b 100644 import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala -index 2f8e401e743..a4f94417dcc 100644 +index 2f8e401e743..dbcf3171946 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -27,9 +27,11 @@ import org.scalatest.time.SpanSugar._ @@ -1782,7 +1782,7 @@ index 2f8e401e743..a4f94417dcc 100644 CostEvaluator.instantiate( classOf[SimpleShuffleSortCostEvaluator].getCanonicalName, spark.sparkContext.getConf) intercept[IllegalArgumentException] { -@@ -2452,6 +2493,7 @@ class AdaptiveQueryExecSuite +@@ -2452,6 +2492,7 @@ class AdaptiveQueryExecSuite val (_, adaptive) = runAdaptiveAndVerifyResult(query) assert(adaptive.collect { case sort: SortExec => sort @@ -1790,7 +1790,7 @@ index 2f8e401e743..a4f94417dcc 100644 }.size == 1) val read = collect(adaptive) { case read: AQEShuffleReadExec => read -@@ -2469,7 +2511,8 @@ class AdaptiveQueryExecSuite +@@ -2469,7 +2510,8 @@ class AdaptiveQueryExecSuite } } @@ -1800,7 +1800,7 @@ index 2f8e401e743..a4f94417dcc 100644 withTempView("v") { withSQLConf( SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "true", -@@ -2581,7 +2624,7 @@ class AdaptiveQueryExecSuite +@@ -2581,7 +2623,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " + "JOIN skewData3 ON value2 = value3") val shuffles1 = collect(adaptive1) { @@ -1809,7 +1809,7 @@ index 2f8e401e743..a4f94417dcc 100644 } assert(shuffles1.size == 4) val smj1 = findTopLevelSortMergeJoin(adaptive1) -@@ -2592,7 +2635,7 @@ class AdaptiveQueryExecSuite +@@ -2592,7 +2634,7 @@ class AdaptiveQueryExecSuite runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " + "JOIN skewData3 ON value1 = value3") val shuffles2 = collect(adaptive2) { @@ -1818,7 +1818,7 @@ index 2f8e401e743..a4f94417dcc 100644 } assert(shuffles2.size == 4) val smj2 = findTopLevelSortMergeJoin(adaptive2) -@@ -2850,6 +2893,7 @@ class AdaptiveQueryExecSuite +@@ -2850,6 +2892,7 @@ class AdaptiveQueryExecSuite }.size == (if (firstAccess) 1 else 0)) assert(collect(initialExecutedPlan) { case s: SortExec => s @@ -1826,7 +1826,7 @@ index 2f8e401e743..a4f94417dcc 100644 }.size == (if (firstAccess) 2 else 0)) assert(collect(initialExecutedPlan) { case i: InMemoryTableScanLike => i -@@ -2980,7 +3024,9 @@ class AdaptiveQueryExecSuite +@@ -2980,7 +3023,9 @@ class AdaptiveQueryExecSuite val plan = df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] assert(plan.inputPlan.isInstanceOf[TakeOrderedAndProjectExec]) @@ -2080,30 +2080,10 @@ index 8e88049f51e..20d7ef7b1bc 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8ed9ef1630e..a865928c1b2 100644 +index 8ed9ef1630e..eed2a6f5ad5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { -+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i.toString)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) - -@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: int as long should throw schema incompatible error") { -+ test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) - -@@ -1345,7 +1347,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1345,7 +1345,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2114,19 +2094,9 @@ index 8ed9ef1630e..a865928c1b2 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index f6472ba3d9d..5ea2d938664 100644 +index f6472ba3d9d..7a8f5317ed7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -@@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { -+ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 1000).map { i => - val ts = new java.sql.Timestamp(i) - Row(ts) @@ -998,7 +999,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } @@ -2137,16 +2107,6 @@ index f6472ba3d9d..5ea2d938664 100644 withAllParquetReaders { withTempPath { path => // Repeated values for dictionary encoding. -@@ -1051,7 +1053,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") - } - -- test("SPARK-34212 Parquet should read decimals correctly") { -+ test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - def readParquet(schema: String, path: File): DataFrame = { - spark.read.schema(schema).parquet(path.toString) - } @@ -1067,7 +1070,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema, path), df) } @@ -2157,7 +2117,7 @@ index f6472ba3d9d..5ea2d938664 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1089,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1089,7 +1091,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2167,17 +2127,7 @@ index f6472ba3d9d..5ea2d938664 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0")) -@@ -1133,7 +1138,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - // The vectorized and non-vectorized readers will produce different exceptions, we don't need -@@ -1148,7 +1154,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1148,7 +1151,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS .where(s"a < ${Long.MaxValue}") .collect() } @@ -2262,14 +2212,14 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index 3f47c5e506f..f1ce3194279 100644 +index 3f47c5e506f..a2a522ae8de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException -+import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion} ++import org.apache.spark.sql.IgnoreComet import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc @@ -2283,26 +2233,87 @@ index 3f47c5e506f..f1ce3194279 100644 withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false) val expectedMessage = "Encountered error while reading file" -@@ -1046,7 +1048,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { - } - } - -- test("schema mismatch failure error message for parquet vectorized reader") { -+ test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { +@@ -1049,41 +1051,27 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + test("schema mismatch failure error message for parquet vectorized reader") { withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) - assert(e.getCause.isInstanceOf[SparkException]) -@@ -1087,7 +1090,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { +- assert(e.getCause.isInstanceOf[SparkException]) +- assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) +- +- // Check if the physical type is reporting correctly +- val errMsg = e.getCause.getMessage +- assert(errMsg.startsWith("Parquet column cannot be converted in file")) +- val file = errMsg.substring("Parquet column cannot be converted in file ".length, +- errMsg.indexOf(". ")) +- val col = spark.read.parquet(file).schema.fields.filter(_.name == "a") +- assert(col.length == 1) +- if (col(0).dataType == StringType) { +- checkError( +- exception = e.getCause.asInstanceOf[SparkException], +- errorClass = "_LEGACY_ERROR_TEMP_2063", +- parameters = Map( +- "filePath" -> +- s".*${dir.getCanonicalPath}.*", +- "column" -> "\\[a\\]", +- "logicalType" -> "int", +- "physicalType" -> "BINARY"), +- matchPVals = true +- ) +- } else { +- checkError( +- exception = e.getCause.asInstanceOf[SparkException], +- errorClass = "_LEGACY_ERROR_TEMP_2063", +- parameters = Map( +- "filePath" -> +- s".*${dir.getCanonicalPath}.*", +- "column" -> "\\[a\\]", +- "logicalType" -> "string", +- "physicalType" -> "INT32"), +- matchPVals = true +- ) ++ // Comet's validation may throw SchemaColumnConvertNotSupportedException directly ++ // (without SparkException wrapper) depending on execution path. ++ // Find SchemaColumnConvertNotSupportedException in the cause chain. ++ var cause: Throwable = e ++ var scnse: SchemaColumnConvertNotSupportedException = null ++ while (cause != null && scnse == null) { ++ cause match { ++ case s: SchemaColumnConvertNotSupportedException => scnse = s ++ case _ => cause = cause.getCause ++ } + } ++ assert(scnse != null, ++ s"Expected SchemaColumnConvertNotSupportedException in cause chain, got: $e") ++ ++ // Check the column name is correct ++ assert(scnse.getColumn.contains("a"), ++ s"Expected column 'a' in exception, got: ${scnse.getColumn}") ++ // Physical type should be either BINARY (string file read as int) ++ // or INT32 (int file read as string) ++ assert(scnse.getPhysicalType == "BINARY" || scnse.getPhysicalType == "INT32", ++ s"Unexpected physical type: ${scnse.getPhysicalType}") } } -- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { -+ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - import testImplicits._ - - withTempPath { dir => +@@ -1102,8 +1090,16 @@ class ParquetSchemaSuite extends ParquetSchemaTest { + val e = intercept[SparkException] { + spark.read.schema(df2.schema).parquet(s"$path/parquet").collect() + } +- assert(e.getCause.isInstanceOf[SparkException]) +- assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) ++ // Comet may throw SchemaColumnConvertNotSupportedException at different ++ // levels in the cause chain depending on execution path. ++ var cause: Throwable = e ++ var found = false ++ while (cause != null && !found) { ++ if (cause.isInstanceOf[SchemaColumnConvertNotSupportedException]) found = true ++ cause = cause.getCause ++ } ++ assert(found, ++ s"Expected SchemaColumnConvertNotSupportedException in cause chain, got: $e") + } + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index b8f3ea3c6f3..bbd44221288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index bd42aaa3e0..3b68e0eb5e 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -2835,25 +2835,7 @@ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ index 4474ec1fd42..05fa0257c82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -39,6 +39,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} - - import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} - import org.apache.spark.sql._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion - import org.apache.spark.sql.catalyst.InternalRow - import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} - import org.apache.spark.sql.catalyst.util.DateTimeUtils -@@ -1059,7 +1060,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { -+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i.toString)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) - -@@ -1344,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1344,7 +1344,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2875,17 +2857,7 @@ index bba71f1c48d..e1b0c25a354 100644 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.util.ArrayData -@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { -+ test("SPARK-47447: read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) - - Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => -@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -318,7 +318,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } @@ -2908,16 +2880,6 @@ index bba71f1c48d..e1b0c25a354 100644 } } } -@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") - } - -- test("SPARK-34212 Parquet should read decimals correctly") { -+ test("SPARK-34212 Parquet should read decimals correctly", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - def readParquet(schema: String, path: File): DataFrame = { - spark.read.schema(schema).parquet(path.toString) - } @@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2938,16 +2900,6 @@ index bba71f1c48d..e1b0c25a354 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - withAllParquetReaders { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index 30503af0fab..1491f4bc2d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -3053,7 +3005,7 @@ index 0acb21f3e6f..1f9c3fd13fc 100644 import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, Row} -+import org.apache.spark.sql.{AnalysisException, IgnoreComet, IgnoreCometNativeDataFusion, Row} ++import org.apache.spark.sql.{AnalysisException, IgnoreComet, Row} import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.functions.desc @@ -3067,26 +3019,6 @@ index 0acb21f3e6f..1f9c3fd13fc 100644 withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false) val expectedMessage = "Encountered error while reading file" -@@ -1046,7 +1047,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { - } - } - -- test("schema mismatch failure error message for parquet vectorized reader") { -+ test("schema mismatch failure error message for parquet vectorized reader", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { dir => - val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) - assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) -@@ -1079,7 +1081,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { - } - } - -- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { -+ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - import testImplicits._ - - withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala index 09ed6955a51..98e313cddd4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index dc0e7099c9..1edadddc45 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -166,13 +166,15 @@ case class CometScanRule(session: SparkSession) return scanExec } - COMET_NATIVE_SCAN_IMPL.get() match { + val result = COMET_NATIVE_SCAN_IMPL.get() match { case SCAN_AUTO | SCAN_NATIVE_DATAFUSION => nativeDataFusionScan(plan, session, scanExec, r, hadoopConf).getOrElse(scanExec) case SCAN_NATIVE_ICEBERG_COMPAT => nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) } + result + case _ => withInfo(scanExec, s"Unsupported relation ${scanExec.relation}") } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index ae2d873ef7..78f16490b1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -147,6 +147,14 @@ case class CometNativeScanExec( // Get file partitions from CometScanExec (handles bucketing, etc.) val filePartitions = scan.getFilePartitions() + // Validate per-file schema compatibility before native execution. + CometScanUtils.validatePerFileSchemaCompatibility( + relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options), + requiredSchema, + relation.partitionSchema.fieldNames.toSet, + relation.sparkSession.sessionState.conf.caseSensitiveAnalysis, + filePartitions) + // Serialize each partition's files import org.apache.comet.serde.operator.partition2Proto val perPartitionBytes = filePartitions.map { filePartition => diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index 652fdfc96d..b28a42acc2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -232,6 +232,17 @@ case class CometScanExec( } lazy val inputRDD: RDD[InternalRow] = { + // Validate per-file schema compatibility before reading any data. + // This must run here (not in doExecuteColumnar) because when CometScanExec is + // wrapped by a parent native operator (CometScanWrapper), the parent reads from + // inputRDD directly via JNI and doExecuteColumnar() is never called. + CometScanUtils.validatePerFileSchemaCompatibility( + relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options), + requiredSchema, + relation.partitionSchema.fieldNames.toSet, + relation.sparkSession.sessionState.conf.caseSensitiveAnalysis, + getFilePartitions()) + val options = relation.options + (FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString) val readFile: (PartitionedFile) => Iterator[InternalRow] = diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanUtils.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanUtils.scala index 4cd3996669..0b9f435875 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanUtils.scala @@ -19,7 +19,18 @@ package org.apache.spark.sql.comet +import scala.jdk.CollectionConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.schema.Type.Repetition import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression, Literal} +import org.apache.spark.sql.comet.shims.ShimParquetSchemaError +import org.apache.spark.sql.execution.datasources.{FilePartition, SchemaColumnConvertNotSupportedException} +import org.apache.spark.sql.types._ + +import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus +import org.apache.comet.parquet.{FooterReader, TypeUtil} object CometScanUtils { @@ -30,4 +41,130 @@ object CometScanUtils { def filterUnusedDynamicPruningExpressions(predicates: Seq[Expression]): Seq[Expression] = { predicates.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)) } + + /** + * Validate per-file schema compatibility by reading actual Parquet file metadata. + * + * For each file in the scan, reads the Parquet footer and validates each required column + * against the actual file schema. This catches mismatches that table-level schema checks miss + * (e.g., when `spark.read.schema(...)` specifies a type incompatible with the file). + * + * Checks structural mismatches (scalar as array, primitive as struct/map) on all Spark + * versions. On Spark 4.0+, also validates primitive-to-primitive type mismatches via + * TypeUtil.checkParquetType. On Spark 3.x, primitive type checks are left to the execution path + * which has proper config awareness (schema evolution). + */ + def validatePerFileSchemaCompatibility( + hadoopConf: Configuration, + requiredSchema: StructType, + partitionColumnNames: Set[String], + caseSensitive: Boolean, + filePartitions: Seq[FilePartition]): Unit = { + + for { + partition <- filePartitions + file <- partition.files + } { + val filePath = file.filePath.toString() + // Read footer; skip files with unsupported Parquet types (TIMESTAMP(NANOS), INTERVAL) + val footerOpt = + try { + Some(FooterReader.readFooter(hadoopConf, file)) + } catch { + case _: Exception => None + } + footerOpt.foreach { footer => + val fileSchema = footer.getFileMetaData.getSchema + + requiredSchema.fields.foreach { field => + val fieldName = field.name + // Skip partition columns - their values come from directory paths, not the file + val isPartitionCol = if (caseSensitive) { + partitionColumnNames.contains(fieldName) + } else { + partitionColumnNames.exists(_.equalsIgnoreCase(fieldName)) + } + if (!isPartitionCol) { + val parquetFieldOpt = { + val fields = fileSchema.getFields.asScala + if (caseSensitive) fields.find(_.getName == fieldName) + else fields.find(_.getName.equalsIgnoreCase(fieldName)) + } + + parquetFieldOpt.foreach { parquetField => + field.dataType match { + case _: ArrayType => + // A REPEATED primitive/group is a valid legacy 2-level Parquet array. + // Only reject when the file has a non-repeated primitive. + if (parquetField.isPrimitive && + parquetField.getRepetition != Repetition.REPEATED) { + throwSchemaMismatch( + filePath, + fieldName, + field.dataType.catalogString, + parquetField.asPrimitiveType.getPrimitiveTypeName.toString) + } + + case _: StructType | _: MapType => + if (parquetField.isPrimitive) { + throwSchemaMismatch( + filePath, + fieldName, + field.dataType.catalogString, + parquetField.asPrimitiveType.getPrimitiveTypeName.toString) + } + + case _ => + if (parquetField.isPrimitive) { + // TypeUtil.checkParquetType for primitive type validation. + // On Spark 3.x with schema evolution enabled, suppress SCNSE errors + // since TypeUtil allows extra conversions (Int->Long). + val schemaEvolutionEnabled = + CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.get() + val descriptor = + fileSchema.getColumnDescription(Array(parquetField.getName)) + if (descriptor != null) { + try { + TypeUtil.checkParquetType(descriptor, field.dataType) + } catch { + case scnse: SchemaColumnConvertNotSupportedException + if !schemaEvolutionEnabled || isSpark40Plus => + throw ShimParquetSchemaError.parquetColumnMismatchError( + filePath, + fieldName, + field.dataType.catalogString, + scnse.getPhysicalType, + scnse) + case _: SchemaColumnConvertNotSupportedException => + // Schema evolution on Spark 3.x - suppress + case re: RuntimeException => + // TypeUtil.convertErrorForTimestampNTZ throws RuntimeException + // for LTZ→NTZ on Spark 3.x. Preserve original message so tests + // can assert on it (e.g., "Unable to create Parquet converter"). + throw ShimParquetSchemaError.parquetRuntimeError(filePath, re) + } + } + } + // else: complex type for non-Array/Struct/Map - let execution handle + } + } + } + } + } + } + } + + private def throwSchemaMismatch( + filePath: String, + column: String, + expectedType: String, + actualType: String): Unit = { + val scnse = new SchemaColumnConvertNotSupportedException(column, actualType, expectedType) + throw ShimParquetSchemaError.parquetColumnMismatchError( + filePath, + column, + expectedType, + actualType, + scnse) + } } diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala new file mode 100644 index 0000000000..d2607e5e00 --- /dev/null +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.shims + +import org.apache.spark.SparkException +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException + +/** + * Spark 3.4: matches Spark's native cause chain depth. + * + * Spark's FileScanRDD wraps errors as: SparkException (file-level) → SparkException + * (column-level) → SCNSE + * + * Tests assert at different levels: + * - SPARK-35640: getMessage.contains("Parquet column cannot be converted in file") + * - SPARK-34212: getCause.getCause.isInstanceOf[SCNSE] + * - row group skipping: getMessage.contains("Column: [a], Expected: bigint, Found: INT32") + * + * The outer message must include the column details so getMessage() checks pass. + */ +object ShimParquetSchemaError { + def parquetColumnMismatchError( + filePath: String, + column: String, + expectedType: String, + actualType: String, + cause: SchemaColumnConvertNotSupportedException): SparkException = { + val columnMsg = + s"Parquet column cannot be converted in file $filePath. " + + s"Column: [$column], Expected: $expectedType, Found: $actualType" + // Inner SparkException (column-level) — what getCause returns + val inner = new SparkException(columnMsg, cause) + // Outer SparkException (file-level) — includes column details in message + // so getMessage().contains() checks work at any level + new SparkException(columnMsg, inner) + } + + /** Wrap a RuntimeException (e.g., from TypeUtil.convertErrorForTimestampNTZ) */ + def parquetRuntimeError(filePath: String, cause: RuntimeException): SparkException = { + new SparkException(cause.getMessage, cause) + } +} diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala new file mode 100644 index 0000000000..3346af4f70 --- /dev/null +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.shims + +import org.apache.spark.SparkException +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException + +/** + * Spark 3.5: matches Spark's native cause chain depth. + * + * Spark's FileScanRDD wraps errors as: SparkException (file-level) → SparkException + * (column-level) → SCNSE + * + * Tests assert at different levels: + * - SPARK-35640: getMessage.contains("Parquet column cannot be converted in file") + * - SPARK-34212: getCause.getCause.isInstanceOf[SCNSE] + * - row group skipping: getMessage.contains("Column: [a], Expected: bigint, Found: INT32") + * + * The outer message must include the column details so getMessage() checks pass. + */ +object ShimParquetSchemaError { + def parquetColumnMismatchError( + filePath: String, + column: String, + expectedType: String, + actualType: String, + cause: SchemaColumnConvertNotSupportedException): SparkException = { + val columnMsg = + s"Parquet column cannot be converted in file $filePath. " + + s"Column: [$column], Expected: $expectedType, Found: $actualType" + // Inner SparkException (column-level) — what getCause returns + val inner = new SparkException(columnMsg, cause) + // Outer SparkException (file-level) — includes column details in message + // so getMessage().contains() checks work at any level + new SparkException(columnMsg, inner) + } + + /** Wrap a RuntimeException (e.g., from TypeUtil.convertErrorForTimestampNTZ) */ + def parquetRuntimeError(filePath: String, cause: RuntimeException): SparkException = { + new SparkException(cause.getMessage, cause) + } +} diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala new file mode 100644 index 0000000000..bef16c211b --- /dev/null +++ b/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimParquetSchemaError.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.shims + +import org.apache.spark.SparkException +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException + +/** + * Spark 4.0: uses structured error class with message parameters. + */ +object ShimParquetSchemaError { + def parquetColumnMismatchError( + filePath: String, + column: String, + expectedType: String, + actualType: String, + cause: SchemaColumnConvertNotSupportedException): SparkException = { + new SparkException( + errorClass = "FAILED_READ_FILE.PARQUET_COLUMN_DATA_TYPE_MISMATCH", + messageParameters = Map( + "path" -> filePath, + "column" -> s"[$column]", + "expectedType" -> expectedType, + "actualType" -> actualType), + cause = cause) + } + + /** Wrap a RuntimeException (e.g., from TypeUtil.convertErrorForTimestampNTZ) */ + def parquetRuntimeError(filePath: String, cause: RuntimeException): SparkException = { + new SparkException(cause.getMessage, cause) + } +} diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 75ac889228..c9d3c2e7e0 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -1576,4 +1577,113 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { } } + test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { + val data = (1 to 4).map(i => Tuple1(i.toString)) + val readSchema = StructType(Seq(StructField("_1", TimestampType))) + + withParquetDataFrame(data) { _ => + withTempPath { dir => + spark + .createDataFrame( + spark.sparkContext.parallelize(data.map(Row.fromTuple)), + StructType(Seq(StructField("_1", StringType)))) + .write + .parquet(dir.getCanonicalPath) + + val e = intercept[SparkException] { + spark.read.schema(readSchema).parquet(dir.getCanonicalPath).collect() + } + // Verify SchemaColumnConvertNotSupportedException is somewhere in the cause chain + var cause: Throwable = e + var found = false + while (cause != null && !found) { + if (cause.isInstanceOf[SchemaColumnConvertNotSupportedException]) { + found = true + } + cause = cause.getCause + } + assert( + found, + s"Expected SchemaColumnConvertNotSupportedException in cause chain, got: $e") + } + } + } + + test("SPARK-45604: schema mismatch on timestamp_ntz to array") { + import org.apache.spark.sql.functions.lit + withTempPath { dir => + val path = dir.getCanonicalPath + // Write a file with scalar timestamp_ntz column + val df1 = spark + .range(1) + .selectExpr("CAST(id AS INT) AS _1") + .withColumn("_2", lit("2024-01-01T00:00:00").cast(TimestampNTZType)) + // Write another file with array column + val arraySchema = StructType( + Seq(StructField("_1", IntegerType), StructField("_2", ArrayType(TimestampNTZType)))) + val df2Row = Row(2, Array(java.time.LocalDateTime.of(2024, 1, 1, 0, 0))) + val df2 = spark.createDataFrame(spark.sparkContext.parallelize(Seq(df2Row)), arraySchema) + df1.write.mode("overwrite").parquet(s"$path/parquet") + df2.write.mode("append").parquet(s"$path/parquet") + + val e = intercept[SparkException] { + spark.read.schema(arraySchema).parquet(s"$path/parquet").collect() + } + var cause: Throwable = e + var found = false + while (cause != null && !found) { + if (cause.isInstanceOf[SchemaColumnConvertNotSupportedException]) { + found = true + } + cause = cause.getCause + } + assert(found, s"Expected SchemaColumnConvertNotSupportedException in cause chain, got: $e") + } + } + + test("schema mismatch: Comet should match Spark behavior for incompatible type reads") { + // Spark 4 is more permissive than Spark 3 for some of these, so we verify Comet + // matches Spark rather than asserting a specific outcome. + val cases: Seq[(DataType, DataType, String)] = Seq( + (IntegerType, StringType, "int-as-string"), + (StringType, IntegerType, "string-as-int"), + (BooleanType, IntegerType, "boolean-as-int"), + (IntegerType, TimestampType, "int-as-timestamp"), + (DoubleType, IntegerType, "double-as-int")) + + Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { + scanMode => + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode) { + cases.foreach { case (writeType, readType, desc) => + withTempPath { path => + val writeSchema = StructType(Seq(StructField("col", writeType, true))) + val rows = (0 until 10).map { i => + val v: Any = writeType match { + case IntegerType => i + case StringType => s"str_$i" + case BooleanType => i % 2 == 0 + case DoubleType => i.toDouble + } + Row(v) + } + spark + .createDataFrame(spark.sparkContext.parallelize(rows), writeSchema) + .write + .parquet(path.getCanonicalPath) + + val readSchema = StructType(Seq(StructField("col", readType, true))) + readParquetFile(path.getCanonicalPath, Some(readSchema)) { df => + val (sparkError, cometError) = checkSparkAnswerMaybeThrows(df) + assert( + sparkError.isDefined == cometError.isDefined, + s"[$scanMode] $desc: Spark " + + s"${if (sparkError.isDefined) "errored" else "succeeded"}" + + s" but Comet ${if (cometError.isDefined) "errored" else "succeeded"}") + } + } + } + } + } + } + }