diff --git a/native/spark-expr/src/utils.rs b/native/spark-expr/src/utils.rs index 613f55cf77..dc9147e2bd 100644 --- a/native/spark-expr/src/utils.rs +++ b/native/spark-expr/src/utils.rs @@ -36,7 +36,7 @@ use arrow::{ array::{as_dictionary_array, Array, ArrayRef, PrimitiveArray}, temporal_conversions::as_datetime, }; -use chrono::{DateTime, Offset, TimeZone}; +use chrono::{DateTime, LocalResult, NaiveDateTime, Offset, TimeZone}; /// Preprocesses input arrays to add timezone information from Spark to Arrow array datatype or /// to apply timezone offset. @@ -174,6 +174,34 @@ fn datetime_cast_err(value: i64) -> ArrowError { )) } +/// Resolves a local datetime in the given timezone to an absolute DateTime, +/// handling DST ambiguity and spring-forward gaps. +/// Parameters: +/// tz - timezone used to interpret local_datetime +/// local_datetime - a naive local datetime to resolve +fn resolve_local_datetime(tz: &Tz, local_datetime: NaiveDateTime) -> DateTime { + match tz.from_local_datetime(&local_datetime) { + LocalResult::Single(dt) => dt, + LocalResult::Ambiguous(dt, _) => dt, + LocalResult::None => { + // Determine offset before time-change + let probe = local_datetime - chrono::Duration::hours(3); + let pre_offset = match tz.from_local_datetime(&probe) { + LocalResult::Single(dt) => dt.offset().fix(), + LocalResult::Ambiguous(dt, _) => dt.offset().fix(), + LocalResult::None => { + // Cannot determine offset; fall back to UTC interpretation + return local_datetime.and_utc().with_timezone(tz); + } + }; + let offset_secs = pre_offset.local_minus_utc() as i64; + + let utc_naive = local_datetime - chrono::Duration::seconds(offset_secs); + utc_naive.and_utc().with_timezone(tz) + } + } +} + /// Takes in a Timestamp(Microsecond, None) array and a timezone id, and returns /// a Timestamp(Microsecond, Some<_>) array. /// The understanding is that the input array has time in the timezone specified in the second @@ -196,8 +224,8 @@ fn timestamp_ntz_to_timestamp( as_datetime::(value) .ok_or_else(|| datetime_cast_err(value)) .map(|local_datetime| { - let datetime: DateTime = - tz.from_local_datetime(&local_datetime).unwrap(); + let datetime = resolve_local_datetime(&tz, local_datetime); + datetime.timestamp_micros() }) })?; @@ -215,8 +243,8 @@ fn timestamp_ntz_to_timestamp( as_datetime::(value) .ok_or_else(|| datetime_cast_err(value)) .map(|local_datetime| { - let datetime: DateTime = - tz.from_local_datetime(&local_datetime).unwrap(); + let datetime = resolve_local_datetime(&tz, local_datetime); + datetime.timestamp_millis() }) })?; @@ -312,6 +340,19 @@ pub fn unlikely(b: bool) -> bool { mod tests { use super::*; + fn array_containing(local_datetime: &str) -> ArrayRef { + let dt = NaiveDateTime::parse_from_str(local_datetime, "%Y-%m-%d %H:%M:%S").unwrap(); + let ts = dt.and_utc().timestamp_micros(); + Arc::new(TimestampMicrosecondArray::from(vec![ts])) + } + + fn micros_for(datetime: &str) -> i64 { + NaiveDateTime::parse_from_str(datetime, "%Y-%m-%d %H:%M:%S") + .unwrap() + .and_utc() + .timestamp_micros() + } + #[test] fn test_build_bool_state() { let mut builder = BooleanBufferBuilder::new(0); @@ -330,4 +371,34 @@ mod tests { ); assert_eq!(last, build_bool_state(&mut builder, &EmitTo::All)); } + + #[test] + fn test_timestamp_ntz_to_timestamp_handles_non_existent_time() { + let output = timestamp_ntz_to_timestamp( + array_containing("2024-03-31 01:30:00"), + "Europe/London", + None, + ) + .unwrap(); + + assert_eq!( + as_primitive_array::(&output).value(0), + micros_for("2024-03-31 01:30:00") + ); + } + + #[test] + fn test_timestamp_ntz_to_timestamp_handles_ambiguous_time() { + let output = timestamp_ntz_to_timestamp( + array_containing("2024-10-27 01:30:00"), + "Europe/London", + None, + ) + .unwrap(); + + assert_eq!( + as_primitive_array::(&output).value(0), + micros_for("2024-10-27 00:30:00") + ); + } } diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 9f5413933a..fd3e7877a7 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -489,4 +489,24 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH dummyDF.selectExpr("unix_date(cast(NULL as date))")) } } + + test("cast TimestampNTZ to Timestamp - DST edge cases") { + val data = Seq( + Row(java.time.LocalDateTime.parse("2024-03-31T01:30:00")), // Spring forward (Europe/London) + Row(java.time.LocalDateTime.parse("2024-10-27T01:30:00")) // Fall back (Europe/London) + ) + val schema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true))) + spark + .createDataFrame(spark.sparkContext.parallelize(data), schema) + .createOrReplaceTempView("dst_tbl") + + // We `allowIncompatible` here because casts involving TimestampNTZ are marked + // as Incompatible (due to incorrect behaviour when casting from a string) + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Europe/London", + "spark.comet.expression.Cast.allowIncompatible" -> "true") { + checkSparkAnswerAndOperator( + "SELECT ts_ntz, CAST(ts_ntz AS TIMESTAMP) FROM dst_tbl ORDER BY ts_ntz") + } + } }