From f989f7cc496c902773703f73f68eced9195d3dc1 Mon Sep 17 00:00:00 2001 From: Matthew Alex Date: Fri, 10 Apr 2026 21:20:50 +0100 Subject: [PATCH 1/3] fix: handle ambiguous and non-existent local times (#3865) * fix: choose earliest time when ambiguous * fix: handle non-existent times due to time change * test: add unit tests for ambiguous and non-existent local times * refactor: remove unnecessary catch_unwind in tests * test: add spark test for timestampntz dst casting * refactor: remove unwrap from resolve_local_datetime --------- Co-authored-by: Matthew Alex --- native/spark-expr/src/utils.rs | 81 ++++++++++++- .../comet/CometTemporalExpressionSuite.scala | 111 ++++++++++++++++++ 2 files changed, 187 insertions(+), 5 deletions(-) diff --git a/native/spark-expr/src/utils.rs b/native/spark-expr/src/utils.rs index 613f55cf77..dc9147e2bd 100644 --- a/native/spark-expr/src/utils.rs +++ b/native/spark-expr/src/utils.rs @@ -36,7 +36,7 @@ use arrow::{ array::{as_dictionary_array, Array, ArrayRef, PrimitiveArray}, temporal_conversions::as_datetime, }; -use chrono::{DateTime, Offset, TimeZone}; +use chrono::{DateTime, LocalResult, NaiveDateTime, Offset, TimeZone}; /// Preprocesses input arrays to add timezone information from Spark to Arrow array datatype or /// to apply timezone offset. @@ -174,6 +174,34 @@ fn datetime_cast_err(value: i64) -> ArrowError { )) } +/// Resolves a local datetime in the given timezone to an absolute DateTime, +/// handling DST ambiguity and spring-forward gaps. +/// Parameters: +/// tz - timezone used to interpret local_datetime +/// local_datetime - a naive local datetime to resolve +fn resolve_local_datetime(tz: &Tz, local_datetime: NaiveDateTime) -> DateTime { + match tz.from_local_datetime(&local_datetime) { + LocalResult::Single(dt) => dt, + LocalResult::Ambiguous(dt, _) => dt, + LocalResult::None => { + // Determine offset before time-change + let probe = local_datetime - chrono::Duration::hours(3); + let pre_offset = match tz.from_local_datetime(&probe) { + LocalResult::Single(dt) => dt.offset().fix(), + LocalResult::Ambiguous(dt, _) => dt.offset().fix(), + LocalResult::None => { + // Cannot determine offset; fall back to UTC interpretation + return local_datetime.and_utc().with_timezone(tz); + } + }; + let offset_secs = pre_offset.local_minus_utc() as i64; + + let utc_naive = local_datetime - chrono::Duration::seconds(offset_secs); + utc_naive.and_utc().with_timezone(tz) + } + } +} + /// Takes in a Timestamp(Microsecond, None) array and a timezone id, and returns /// a Timestamp(Microsecond, Some<_>) array. /// The understanding is that the input array has time in the timezone specified in the second @@ -196,8 +224,8 @@ fn timestamp_ntz_to_timestamp( as_datetime::(value) .ok_or_else(|| datetime_cast_err(value)) .map(|local_datetime| { - let datetime: DateTime = - tz.from_local_datetime(&local_datetime).unwrap(); + let datetime = resolve_local_datetime(&tz, local_datetime); + datetime.timestamp_micros() }) })?; @@ -215,8 +243,8 @@ fn timestamp_ntz_to_timestamp( as_datetime::(value) .ok_or_else(|| datetime_cast_err(value)) .map(|local_datetime| { - let datetime: DateTime = - tz.from_local_datetime(&local_datetime).unwrap(); + let datetime = resolve_local_datetime(&tz, local_datetime); + datetime.timestamp_millis() }) })?; @@ -312,6 +340,19 @@ pub fn unlikely(b: bool) -> bool { mod tests { use super::*; + fn array_containing(local_datetime: &str) -> ArrayRef { + let dt = NaiveDateTime::parse_from_str(local_datetime, "%Y-%m-%d %H:%M:%S").unwrap(); + let ts = dt.and_utc().timestamp_micros(); + Arc::new(TimestampMicrosecondArray::from(vec![ts])) + } + + fn micros_for(datetime: &str) -> i64 { + NaiveDateTime::parse_from_str(datetime, "%Y-%m-%d %H:%M:%S") + .unwrap() + .and_utc() + .timestamp_micros() + } + #[test] fn test_build_bool_state() { let mut builder = BooleanBufferBuilder::new(0); @@ -330,4 +371,34 @@ mod tests { ); assert_eq!(last, build_bool_state(&mut builder, &EmitTo::All)); } + + #[test] + fn test_timestamp_ntz_to_timestamp_handles_non_existent_time() { + let output = timestamp_ntz_to_timestamp( + array_containing("2024-03-31 01:30:00"), + "Europe/London", + None, + ) + .unwrap(); + + assert_eq!( + as_primitive_array::(&output).value(0), + micros_for("2024-03-31 01:30:00") + ); + } + + #[test] + fn test_timestamp_ntz_to_timestamp_handles_ambiguous_time() { + let output = timestamp_ntz_to_timestamp( + array_containing("2024-10-27 01:30:00"), + "Europe/London", + None, + ) + .unwrap(); + + assert_eq!( + as_primitive_array::(&output).value(0), + micros_for("2024-10-27 00:30:00") + ); + } } diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 1ae6926e05..e75070e800 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -395,4 +395,115 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH // Test null handling checkSparkAnswerAndOperator("SELECT unix_date(NULL)") } + + /** + * Checks that the Comet-evaluated DataFrame produces the same results as the baseline DataFrame + * evaluated by native Spark JVM, and that Comet native operators are used. This is needed + * because Days is a PartitionTransformExpression that extends Unevaluable, so + * checkSparkAnswerAndOperator cannot be used directly. + */ + private def checkDays(cometDF: DataFrame, baselineDF: DataFrame): Unit = { + // Ensure the expected answer is evaluated solely by native Spark JVM (Comet off) + var expected: Array[Row] = Array.empty + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + expected = baselineDF.collect() + } + checkAnswer(cometDF, expected.toSeq) + checkCometOperators(stripAQEPlan(cometDF.queryExecution.executedPlan)) + } + + test("days - date input") { + val r = new Random(42) + val dateSchema = StructType(Seq(StructField("d", DataTypes.DateType, true))) + val dateDF = FuzzDataGenerator.generateDataFrame(r, spark, dateSchema, 1000, DataGenOptions()) + + checkDays( + dateDF.select(col("d"), getColumnFromExpression(Days(UnresolvedAttribute("d")))), + dateDF.selectExpr("d", "unix_date(d)")) + } + + test("days - timestamp input") { + val r = new Random(42) + val tsSchema = StructType(Seq(StructField("ts", DataTypes.TimestampType, true))) + val tsDF = FuzzDataGenerator.generateDataFrame(r, spark, tsSchema, 1000, DataGenOptions()) + + for (timezone <- Seq("UTC", "America/Los_Angeles", "Asia/Tokyo")) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) { + checkDays( + tsDF.select(col("ts"), getColumnFromExpression(Days(UnresolvedAttribute("ts")))), + tsDF.selectExpr("ts", "unix_date(cast(ts as date))")) + } + } + } + + test("days - literal edge cases") { + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + + val dummyDF = spark.range(1) + + // Pre-epoch (should return negative day numbers) + checkDays( + dummyDF.select( + getColumnFromExpression( + Days(Literal.create(java.sql.Date.valueOf("1969-12-31"), DataTypes.DateType))), + getColumnFromExpression( + Days(Literal.create(java.sql.Date.valueOf("1960-01-01"), DataTypes.DateType)))), + dummyDF.selectExpr("unix_date(DATE('1969-12-31'))", "unix_date(DATE('1960-01-01'))")) + + // Epoch and post-epoch + checkDays( + dummyDF.select( + getColumnFromExpression( + Days(Literal.create(java.sql.Date.valueOf("1970-01-01"), DataTypes.DateType))), + getColumnFromExpression( + Days(Literal.create(java.sql.Date.valueOf("1970-01-02"), DataTypes.DateType))), + getColumnFromExpression( + Days(Literal.create(java.sql.Date.valueOf("2024-01-01"), DataTypes.DateType)))), + dummyDF.selectExpr( + "unix_date(DATE('1970-01-01'))", + "unix_date(DATE('1970-01-02'))", + "unix_date(DATE('2024-01-01'))")) + + // Timestamp literals + checkDays( + dummyDF.select( + getColumnFromExpression(Days(Literal + .create(java.sql.Timestamp.valueOf("1970-01-01 00:00:00"), DataTypes.TimestampType))), + getColumnFromExpression( + Days( + Literal.create( + java.sql.Timestamp.valueOf("2024-06-15 10:30:00"), + DataTypes.TimestampType)))), + dummyDF.selectExpr( + "unix_date(cast(TIMESTAMP('1970-01-01 00:00:00') as date))", + "unix_date(cast(TIMESTAMP('2024-06-15 10:30:00') as date))")) + + // Null handling + checkDays( + dummyDF.select(getColumnFromExpression(Days(Literal.create(null, DataTypes.DateType)))), + dummyDF.selectExpr("unix_date(cast(NULL as date))")) + } + } + + test("cast TimestampNTZ to Timestamp - DST edge cases") { + val data = Seq( + Row(java.time.LocalDateTime.parse("2024-03-31T01:30:00")), // Spring forward (Europe/London) + Row(java.time.LocalDateTime.parse("2024-10-27T01:30:00")) // Fall back (Europe/London) + ) + val schema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true))) + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("dst_tbl") + + // We `allowIncompatible` here because casts involving TimestampNTZ are marked + // as Incompatible (due to incorrect behaviour when casting from a string) + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Europe/London", + "spark.comet.expression.Cast.allowIncompatible" -> "true") { + checkSparkAnswerAndOperator( + "SELECT ts_ntz, CAST(ts_ntz AS TIMESTAMP) FROM dst_tbl ORDER BY ts_ntz") + } + } } From f877d33df1e54c157b6d78afd38985147ff152c5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 11:55:40 -0600 Subject: [PATCH 2/3] fix: add missing imports in CometTemporalExpressionSuite --- .../org/apache/comet/CometTemporalExpressionSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index e75070e800..fd3e7877a7 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -21,8 +21,11 @@ package org.apache.comet import scala.util.Random -import org.apache.spark.sql.{CometTestBase, Row, SaveMode} +import org.apache.spark.sql.{CometTestBase, DataFrame, Row, SaveMode} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.{Days, Literal} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StructField, StructType} From 07c9a281b7bcede2796291772fbf9320f05edd0d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 14:43:39 -0600 Subject: [PATCH 3/3] fix: remove Days tests not applicable to branch-0.14 The cherry-pick of #3865 inadvertently included Days expression tests from #3746, which is not present on branch-0.14. --- .../comet/CometTemporalExpressionSuite.scala | 96 +------------------ 1 file changed, 1 insertion(+), 95 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index fd3e7877a7..cb9987fbe7 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -21,11 +21,8 @@ package org.apache.comet import scala.util.Random -import org.apache.spark.sql.{CometTestBase, DataFrame, Row, SaveMode} -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.expressions.{Days, Literal} +import org.apache.spark.sql.{CometTestBase, Row, SaveMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, StructField, StructType} @@ -399,97 +396,6 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH checkSparkAnswerAndOperator("SELECT unix_date(NULL)") } - /** - * Checks that the Comet-evaluated DataFrame produces the same results as the baseline DataFrame - * evaluated by native Spark JVM, and that Comet native operators are used. This is needed - * because Days is a PartitionTransformExpression that extends Unevaluable, so - * checkSparkAnswerAndOperator cannot be used directly. - */ - private def checkDays(cometDF: DataFrame, baselineDF: DataFrame): Unit = { - // Ensure the expected answer is evaluated solely by native Spark JVM (Comet off) - var expected: Array[Row] = Array.empty - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { - expected = baselineDF.collect() - } - checkAnswer(cometDF, expected.toSeq) - checkCometOperators(stripAQEPlan(cometDF.queryExecution.executedPlan)) - } - - test("days - date input") { - val r = new Random(42) - val dateSchema = StructType(Seq(StructField("d", DataTypes.DateType, true))) - val dateDF = FuzzDataGenerator.generateDataFrame(r, spark, dateSchema, 1000, DataGenOptions()) - - checkDays( - dateDF.select(col("d"), getColumnFromExpression(Days(UnresolvedAttribute("d")))), - dateDF.selectExpr("d", "unix_date(d)")) - } - - test("days - timestamp input") { - val r = new Random(42) - val tsSchema = StructType(Seq(StructField("ts", DataTypes.TimestampType, true))) - val tsDF = FuzzDataGenerator.generateDataFrame(r, spark, tsSchema, 1000, DataGenOptions()) - - for (timezone <- Seq("UTC", "America/Los_Angeles", "Asia/Tokyo")) { - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) { - checkDays( - tsDF.select(col("ts"), getColumnFromExpression(Days(UnresolvedAttribute("ts")))), - tsDF.selectExpr("ts", "unix_date(cast(ts as date))")) - } - } - } - - test("days - literal edge cases") { - withSQLConf( - SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> - "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { - - val dummyDF = spark.range(1) - - // Pre-epoch (should return negative day numbers) - checkDays( - dummyDF.select( - getColumnFromExpression( - Days(Literal.create(java.sql.Date.valueOf("1969-12-31"), DataTypes.DateType))), - getColumnFromExpression( - Days(Literal.create(java.sql.Date.valueOf("1960-01-01"), DataTypes.DateType)))), - dummyDF.selectExpr("unix_date(DATE('1969-12-31'))", "unix_date(DATE('1960-01-01'))")) - - // Epoch and post-epoch - checkDays( - dummyDF.select( - getColumnFromExpression( - Days(Literal.create(java.sql.Date.valueOf("1970-01-01"), DataTypes.DateType))), - getColumnFromExpression( - Days(Literal.create(java.sql.Date.valueOf("1970-01-02"), DataTypes.DateType))), - getColumnFromExpression( - Days(Literal.create(java.sql.Date.valueOf("2024-01-01"), DataTypes.DateType)))), - dummyDF.selectExpr( - "unix_date(DATE('1970-01-01'))", - "unix_date(DATE('1970-01-02'))", - "unix_date(DATE('2024-01-01'))")) - - // Timestamp literals - checkDays( - dummyDF.select( - getColumnFromExpression(Days(Literal - .create(java.sql.Timestamp.valueOf("1970-01-01 00:00:00"), DataTypes.TimestampType))), - getColumnFromExpression( - Days( - Literal.create( - java.sql.Timestamp.valueOf("2024-06-15 10:30:00"), - DataTypes.TimestampType)))), - dummyDF.selectExpr( - "unix_date(cast(TIMESTAMP('1970-01-01 00:00:00') as date))", - "unix_date(cast(TIMESTAMP('2024-06-15 10:30:00') as date))")) - - // Null handling - checkDays( - dummyDF.select(getColumnFromExpression(Days(Literal.create(null, DataTypes.DateType)))), - dummyDF.selectExpr("unix_date(cast(NULL as date))")) - } - } - test("cast TimestampNTZ to Timestamp - DST edge cases") { val data = Seq( Row(java.time.LocalDateTime.parse("2024-03-31T01:30:00")), // Spring forward (Europe/London)