Skip to content
3 changes: 2 additions & 1 deletion common/src/main/java/org/apache/comet/parquet/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public static void checkParquetType(ColumnDescriptor descriptor, DataType sparkT
PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName();
LogicalTypeAnnotation logicalTypeAnnotation =
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
boolean allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get();
boolean allowTypePromotion =
isSpark40Plus() || (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get();

if (sparkType instanceof NullType) {
return;
Expand Down
114 changes: 82 additions & 32 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2147,19 +2147,15 @@ index 8670d95c65e..c7ba51f770f 100644
}
}

- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") {
+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
test("SPARK-35640: read binary as timestamp should throw schema incompatible error") {
val data = (1 to 4).map(i => Tuple1(i.toString))
val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType)))

@@ -1075,7 +1077,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

- test("SPARK-35640: int as long should throw schema incompatible error") {
+ test("SPARK-35640: int as long should throw schema incompatible error",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
test("SPARK-35640: int as long should throw schema incompatible error") {
val data = (1 to 4).map(i => Tuple1(i))
val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType)))

Expand Down Expand Up @@ -2189,9 +2185,7 @@ index 29cb224c878..ee5a87fa200 100644
}
}

- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
+ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
val data = (1 to 1000).map { i =>
val ts = new java.sql.Timestamp(i)
Row(ts)
Expand All @@ -2209,9 +2203,7 @@ index 29cb224c878..ee5a87fa200 100644
testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
}

- test("SPARK-34212 Parquet should read decimals correctly") {
+ test("SPARK-34212 Parquet should read decimals correctly",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
test("SPARK-34212 Parquet should read decimals correctly") {
def readParquet(schema: String, path: File): DataFrame = {
spark.read.schema(schema).parquet(path.toString)
}
Expand Down Expand Up @@ -2239,9 +2231,7 @@ index 29cb224c878..ee5a87fa200 100644
}
}

- test("row group skipping doesn't overflow when reading into larger type") {
+ test("row group skipping doesn't overflow when reading into larger type",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
test("row group skipping doesn't overflow when reading into larger type") {
withTempPath { path =>
Seq(0).toDF("a").write.parquet(path.toString)
// The vectorized and non-vectorized readers will produce different exceptions, we don't need
Expand Down Expand Up @@ -2330,14 +2320,14 @@ index 5c0b7def039..151184bc98c 100644
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index bf5c51b89bb..4e2f0bdb389 100644
index bf5c51b89bb..44da22178dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type._

import org.apache.spark.SparkException
+import org.apache.spark.sql.{IgnoreComet, IgnoreCometNativeDataFusion}
+import org.apache.spark.sql.IgnoreComet
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.functions.desc
Expand All @@ -2351,26 +2341,86 @@ index bf5c51b89bb..4e2f0bdb389 100644
withTempPath { dir =>
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = false)
val expectedMessage = "Encountered error while reading file"
@@ -1026,7 +1028,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}
}

- test("schema mismatch failure error message for parquet vectorized reader") {
+ test("schema mismatch failure error message for parquet vectorized reader",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
@@ -1029,41 +1031,26 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
test("schema mismatch failure error message for parquet vectorized reader") {
withTempPath { dir =>
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
assert(e.getCause.isInstanceOf[SparkException])
@@ -1067,7 +1070,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
- assert(e.getCause.isInstanceOf[SparkException])
- assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
-
- // Check if the physical type is reporting correctly
- val errMsg = e.getCause.getMessage
- assert(errMsg.startsWith("Parquet column cannot be converted in file"))
- val file = errMsg.substring("Parquet column cannot be converted in file ".length,
- errMsg.indexOf(". "))
- val col = spark.read.parquet(file).schema.fields.filter(_.name == "a")
- assert(col.length == 1)
- if (col(0).dataType == StringType) {
- checkError(
- exception = e.getCause.asInstanceOf[SparkException],
- errorClass = "_LEGACY_ERROR_TEMP_2063",
- parameters = Map(
- "filePath" ->
- s".*${dir.getCanonicalPath}.*",
- "column" -> "\\[a\\]",
- "logicalType" -> "int",
- "physicalType" -> "BINARY"),
- matchPVals = true
- )
- } else {
- checkError(
- exception = e.getCause.asInstanceOf[SparkException],
- errorClass = "_LEGACY_ERROR_TEMP_2063",
- parameters = Map(
- "filePath" ->
- s".*${dir.getCanonicalPath}.*",
- "column" -> "\\[a\\]",
- "logicalType" -> "string",
- "physicalType" -> "INT32"),
- matchPVals = true
- )
+ // Comet's validation may throw SchemaColumnConvertNotSupportedException directly
+ // (without SparkException wrapper) depending on execution path.
+ var cause: Throwable = e
+ var scnse: SchemaColumnConvertNotSupportedException = null
+ while (cause != null && scnse == null) {
+ cause match {
+ case s: SchemaColumnConvertNotSupportedException => scnse = s
+ case _ => cause = cause.getCause
+ }
}
+ assert(scnse != null,
+ s"Expected SchemaColumnConvertNotSupportedException in cause chain, got: $e")
+
+ // Check the column name is correct
+ assert(scnse.getColumn.contains("a"),
+ s"Expected column 'a' in exception, got: ${scnse.getColumn}")
+ // Physical type should be either BINARY (string file read as int)
+ // or INT32 (int file read as string)
+ assert(scnse.getPhysicalType == "BINARY" || scnse.getPhysicalType == "INT32",
+ s"Unexpected physical type: ${scnse.getPhysicalType}")
}
}

- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array<timestamp_ntz>") {
+ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array<timestamp_ntz>",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
import testImplicits._

withTempPath { dir =>
@@ -1082,8 +1069,16 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
val e = intercept[SparkException] {
spark.read.schema(df2.schema).parquet(s"$path/parquet").collect()
}
- assert(e.getCause.isInstanceOf[SparkException])
- assert(e.getCause.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException])
+ // Comet may throw SchemaColumnConvertNotSupportedException at different
+ // levels in the cause chain depending on execution path.
+ var cause: Throwable = e
+ var found = false
+ while (cause != null && !found) {
+ if (cause.isInstanceOf[SchemaColumnConvertNotSupportedException]) found = true
+ cause = cause.getCause
+ }
+ assert(found,
+ s"Expected SchemaColumnConvertNotSupportedException in cause chain, got: $e")
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index 3a0bd35cb70..b28f06a757f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
Expand Down
Loading