feat: expand date/time expression support using codegen dispatcher#4417
Open
andygrove wants to merge 21 commits into
Open
feat: expand date/time expression support using codegen dispatcher#4417andygrove wants to merge 21 commits into
andygrove wants to merge 21 commits into
Conversation
…l short-circuit CometBatchKernelCodegen.defaultBody emitted this.col$ord.isNull(i) for every NullIntolerant input, but primitive Arrow vectors (timestamp / int / float / date / bool / ...) are wrapped in CometPlainVector at input-cast time and expose isNullAt rather than the raw Arrow isNull. The short-circuit therefore failed to compile for any primitive-typed column with a Janino "method isNull not declared" error. Share the existing nullCheckMethod helper between emitTypedGetters and defaultBody so both sites pick the right method name per column. Add a source test that pins the chosen method for TimeStampMicroTZVector inputs.
CometDateFormat keeps the native to_char path for UTC sessions with a format
literal in the strftime-mappable whitelist, and now routes every other case
through the Arrow-direct codegen dispatcher (CometScalaUDFCodegen) so that
non-UTC sessions, non-literal formats, and formats outside the whitelist
stay inside the Comet pipeline running Spark's own DateFormatClass.doGenCode.
Refactor: extract the closure-serialize + JvmScalarUdf-proto emission from
CometScalaUDF.convert into a reusable CometScalaUDF.emitJvmCodegenDispatch
helper. Any serde that wants to fall back to a Spark built-in expression
through the dispatcher can call it. Gated by COMET_SCALA_UDF_CODEGEN_ENABLED
so the default remains a clean Spark fallback for those cases until the
dispatcher graduates from experimental.
Reasoning notes:
- DateFormatClass already has a proper doGenCode (not CodegenFallback),
NullIntolerant, and ResolveTimeZone stamps the timeZoneId on it during
analysis. Closure-serializing the bound tree therefore reproduces
Spark-identical behavior for every timezone.
- The kernel cache key already encodes the literal format and timezone via
the serialized expression bytes, so (format, tz) combinations get
distinct cached kernels just like a bespoke (format, tz) -> formatter
cache would. Saves an entire DateFormatUDF.scala class.
Tests:
- date_format - timestamp_ntz input: now runs checkSparkAnswerAndOperator
for every timezone under the codegen flag instead of falling back for
non-UTC.
- Split each previous "falls back to Spark" Scala test into two: one
asserting the codegen-on path stays in Comet, one asserting the
codegen-off path falls back with the dispatcher flag as the reason.
- date_format.sql now pins a non-UTC session timezone and enables the
codegen flag at file scope; all queries are plain query and assert
in-Comet execution.
The CometScalaUDF fallback message was generalized from 'ScalaUDF has no native path' to 'expression has no native path' when the dispatcher helper was extracted for reuse by CometDateFormat.
87 tasks
- Drop the getCompatibleNotes override on CometCodegenDispatch. The docs generator emits compat notes under a heading promising 'no additional configuration', which contradicts a note describing the dispatcher flag. Keep getSupportLevel=Compatible and surface the flag dependency via withInfo / EXPLAIN instead. - Add a sentinel non-error query to each *_ansi.sql fixture. The expect_error semantics pass vacuously when the dispatcher silently falls back to Spark (both paths throw identical exceptions); the sentinel uses checkSparkAnswerAndOperator and fails if Comet did not run the expression natively. - Pin spark.sql.legacy.timeParserPolicy=CORRECTED in to_unix_timestamp_ansi.sql so the JDK java.time formatter is exercised regardless of runtime default; LEGACY policy uses SimpleDateFormat with a different exception class. - Annotate the three ANSI fixtures with MinSparkVersion: 3.5 since the DATETIME_FIELD_OUT_OF_BOUNDS and CANNOT_PARSE_TIMESTAMP error classes were standardized in Spark 3.5. Spark 3.4 coverage is delivered separately.
Mirror the existing MinSparkVersion gate with a MaxSparkVersion gate so SQL fixtures can pair a 3.5+ variant (using post-3.5 error class names) with a 3.4 variant (using the pre-classification JDK java.time exception text). The make_timestamp and to_unix_timestamp ANSI exception paths produce different exception wording on Spark 3.4 versus 3.5+; before this commit only the 3.5+ side had coverage and 3.4 ANSI behavior went untested. Framework: - SqlTestFile gains maxSparkVersion: Option[String]. - SqlFileTestParser recognizes -- MaxSparkVersion: lines. - CometSqlFileTestSuite gains meetsMaxSparkVersion / skipReason helpers; the skip-and-log path now reports whether the constraint was a floor or ceiling. Coverage: - make_timestamp_ansi_spark34.sql: MaxSparkVersion: 3.4, expect_error patterns target the JDK DateTimeException field-name text (MonthOfYear, Invalid date, HourOfDay) which is stable in 3.4's pre-classification error path. - to_unix_timestamp_ansi_spark34.sql: MaxSparkVersion: 3.4, expect_error pattern targets the JDK DateTimeParseException 'could not be parsed' wording.
a3923d4 to
1b952e7
Compare
CI failed on Spark 3.5.8 because the executor-thrown SparkDateTimeException's
getMessage() does NOT preserve the driver-formatted '[DATETIME_FIELD_OUT_OF_BOUNDS]'
error-class prefix; only the inner JDK message ('Invalid value for MonthOfYear ...',
'Invalid date FEBRUARY 30', 'Invalid value for HourOfDay ...') survives the
'Job aborted ... Lost task ... SparkDateTimeException: <inner>' wrapping that
shows up in the test's caught exception.
Switching to the JDK java.time field-name substrings (MonthOfYear, Invalid date,
HourOfDay) makes the assertions stable across Spark 3.4, 3.5.x, and 4.x without
needing a MinSparkVersion gate, so the make_timestamp_ansi_spark34.sql variant
becomes redundant and is deleted in the same commit.
Verified locally: passes under -Pspark-3.4 (3.4.3) and -Pspark-3.5 (3.5.8).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Part of #3202. Supersedes and consolidates #4373.
Expressions covered
Ten Spark date/time expressions are routed through the Arrow-direct codegen dispatcher in this PR. All run inside the Comet pipeline (no operator-level Spark fallback) when
spark.comet.exec.scalaUDF.codegen.enabled=trueis set. Default behavior is unchanged when the flag is off.date_format(originally feat: route date_format through codegen dispatcher for non-native cases #4373; nativeto_charpath retained for UTC + whitelisted-format cases, dispatcher path for everything else)add_monthsmonths_betweenmake_timestamptimestamp_millis(MillisToTimestamp)timestamp_micros(MicrosToTimestamp)unix_secondsunix_millisunix_microsto_unix_timestampRationale for this change
Comet's plan rules fall back to Spark for any expression that lacks a native serde, breaking up the Comet pipeline at the operator boundary. The Arrow-direct codegen dispatcher already in tree (
CometScalaUDFCodegen, behindspark.comet.exec.scalaUDF.codegen.enabled) closure-serializes a bound Catalyst expression, ships it through aJvmScalarUdfproto, and Janino-compiles Spark's owndoGenCodeinto a per-batch kernel that reads Arrow vectors and writes an Arrow output vector directly. For any expression whosedoGenCodeis real (notCodegenFallback) and whose input/output types fitCometBatchKernelCodegen.isSupportedDataType, routing through this dispatcher reproduces Spark behavior exactly without a bespoke UDF class.This PR establishes a reusable helper for that routing pattern and applies it to the ten expressions above in two waves:
DateFormatClassfirst (originally #4373, folded in here), then nine currently-unsupported expressions that share the same shape.What changes are included in this PR?
fix(codegen)— Pre-existing bug surfaced while wiringdate_format.CometBatchKernelCodegen.defaultBodyemittedthis.col$ord.isNull(i)for everyNullIntolerantinput, but primitive Arrow vectors are wrapped inCometPlainVectorat input-cast time and exposeisNullAt, notisNull. Janino rejected the kernel with "methodisNullnot declared".emitTypedGettersalready knew the right method name vianullCheckMethod; the fix exposes that helper sodefaultBodypicks the same name per column ordinal. New source test pins the chosen method forTimeStampMicroTZVectorso the regression can't recur.feat— dispatcher helper extraction — Extract the closure-serialize +JvmScalarUdfemission fromCometScalaUDF.convertintoCometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)so other serdes can reuse the path.CometDateFormat.convertkeeps the nativeto_charpath for UTC + whitelisted-format cases and now calls the helper for everything else. Gated byspark.comet.exec.scalaUDF.codegen.enabled(default false, experimental); default behavior is unchanged.feat—CometCodegenDispatch[T]base class — A one-class wrapper aroundemitJvmCodegenDispatchlets any expression with a realdoGenCodeslot in with a single-lineobjectdeclaration. The helper marks the expressionCompatible()because the dispatcher runs Spark's owndoGenCodeinside the kernel; behavior matches Spark exactly when the flag is on, and the operator falls back cleanly when it is off.feat— nine new datetime serdes — Each of the nine new expressions becomes a one-line singletonextends CometCodegenDispatch[T]and is registered intemporalExpressions. Three are ANSI-sensitive (MakeTimestamp,MillisToTimestamp,ToUnixTimestampcarryfailOnError); the dispatcher inherits the throw site from Spark's owndoGenCode, so exception semantics propagate without any serde-level branching.test—MaxSparkVersionannotation — Mirror the existingMinSparkVersionparser gate with aMaxSparkVersionceiling. TheCometSqlFileTestSuiteskip logic now reports whether a constraint was a floor or ceiling. This lets fixtures pair a 3.5+ variant against a 3.4 variant where the expected error class wording differs.Interval-producing expressions (
MakeInterval/MakeYMInterval/MakeDTInterval) are explicitly out of scope: the dispatcher'sisSupportedDataTypedoes not include Spark's interval types. Version-conditional expressions (TimestampAdd/TimestampDiff3.4+,DayName3.5+,MonthName4.0+) are deferred to a follow-on so this PR avoids touching theCometExprShimfiles.Scaffolded with the
superpowers:brainstormingandsuperpowers:writing-plansskills.How are these changes tested?
CometTemporalExpressionSuitedate_formattests: 10/10 pass. Three "falls back to Spark" tests are paired with a "routes via codegen dispatcher" sibling that enables the flag and asserts in-Comet execution.date_format - timestamp_ntz inputrunscheckSparkAnswerAndOperatorfor every timezone under the codegen flag.CometSqlFileTestSuite: nine new per-expression SQL fixtures (add_months.sql,months_between.sql,make_timestamp.sql,timestamp_millis.sql,timestamp_micros.sql,unix_seconds.sql,unix_millis.sql,unix_micros.sql,to_unix_timestamp.sql) pin a non-UTC session timezone and the codegen flag at file scope.date_format.sqlfrom feat: route date_format through codegen dispatcher for non-native cases #4373 is included.make_timestamp_ansi.sql/make_timestamp_ansi_spark34.sql,to_unix_timestamp_ansi.sql/to_unix_timestamp_ansi_spark34.sql, plustimestamp_millis_ansi.sql. The Spark 3.5+ files match theDATETIME_FIELD_OUT_OF_BOUNDS/CANNOT_PARSE_TIMESTAMPerror classes; the 3.4 files match the underlying JDKjava.timeexception text (MonthOfYear,Invalid date,HourOfDay,could not be parsed). Each ANSI file includes a sentinel non-errorquerythat usescheckSparkAnswerAndOperatorso a silent dispatcher fallback would fail the file (theexpect_errorpath alone passes vacuously because Spark fallback also throws).CometCodegenSourceSuite: includes the newNullIntolerant short-circuit uses isNullAt for CometPlainVector-wrapped columnsregression test plus a parameterizedBucket 4 datetime expressions produce non-empty generated kernel sourcetest covering all nine new expressions.CometCodegenSuite: no regressions in the dispatcher's existing surface.-Pspark-3.4: 284/284 SQL fixtures pass (3.5+ variants correctly skip).