I'm running into this in multiple PRs that try to reduce the FFI deep copies in the system (#4393, #4507) so I need to start tracking it since I keep introducing workarounds on those branches.
Background
Comet serializes Spark catalyst types into a proto schema, and the native planner expects DataFusion physical expressions to produce arrays matching those Arrow types. Several DataFusion / datafusion-spark functions return arrays whose Arrow type does not match what Spark catalyst declares.
On main, this is masked because batches cross JVM-to-native FFI boundaries inside a stage. Each crossing deep-copies and re-advertises the stream schema, and the consuming ScanExec.build_record_batch casts to the declared schema. Together they reshape Timestamp(us) -> Timestamp(us, "UTC"), List(nullable Int32) -> List(non-null Int32), etc. before validation runs.
Reducing FFI hops removes both the deep-copy re-stamp and the scan cast, so the drift surfaces as Invalid argument error: column types must match schema types, expected ... but found .... PRs that hit this work around it with a Projection(Cast) and a warning per drifting column. The right fix is to correct the function return types upstream so the cast becomes a no-op.
Related issue
Observed mismatches
| Spark expression |
Catalyst declares |
DataFusion / datafusion-spark produces |
width_bucket |
LongType (Int64) |
Int32 |
date_trunc(unit, ts) |
Timestamp(us, "UTC") |
Timestamp(us) (no timezone) |
collect_set(int) |
List(non-null Int32) |
List(nullable Int32) |
Not exhaustive. Each warning emitted by the workaround cast layer on a branch that reduces FFI hops is a candidate to add here.
Other victims of the same drift
The shuffle writer is not the only consumer that assumes catalyst-aligned Arrow types. The codegen kernel input layer (CometBatchKernelCodegenInput.emitInputCasts) emits unchecked Java casts to specific Arrow vector classes (BigIntVector, TimeStampMicroTZVector, ...) chosen from the catalyst dataType of each bound Attribute. If a drifted batch reaches a codegen kernel, the cast throws ClassCastException on the first batch. Today this is masked because codegen reads JVM-side vectors that already went through an FFI deep-copy + ScanExec cast; FFI-reduced pipelines that feed codegen directly would need the same alignment step at the JVM/native boundary feeding the kernel.
I'm running into this in multiple PRs that try to reduce the FFI deep copies in the system (#4393, #4507) so I need to start tracking it since I keep introducing workarounds on those branches.
Background
Comet serializes Spark catalyst types into a proto schema, and the native planner expects DataFusion physical expressions to produce arrays matching those Arrow types. Several DataFusion /
datafusion-sparkfunctions return arrays whose Arrow type does not match what Spark catalyst declares.On
main, this is masked because batches cross JVM-to-native FFI boundaries inside a stage. Each crossing deep-copies and re-advertises the stream schema, and the consumingScanExec.build_record_batchcasts to the declared schema. Together they reshapeTimestamp(us)->Timestamp(us, "UTC"),List(nullable Int32)->List(non-null Int32), etc. before validation runs.Reducing FFI hops removes both the deep-copy re-stamp and the scan cast, so the drift surfaces as
Invalid argument error: column types must match schema types, expected ... but found .... PRs that hit this work around it with aProjection(Cast)and a warning per drifting column. The right fix is to correct the function return types upstream so the cast becomes a no-op.Related issue
width_bucketreturnsInt32instead ofInt64.Observed mismatches
width_bucketLongType(Int64)Int32date_trunc(unit, ts)Timestamp(us, "UTC")Timestamp(us)(no timezone)collect_set(int)List(non-null Int32)List(nullable Int32)Not exhaustive. Each warning emitted by the workaround cast layer on a branch that reduces FFI hops is a candidate to add here.
Other victims of the same drift
The shuffle writer is not the only consumer that assumes catalyst-aligned Arrow types. The codegen kernel input layer (
CometBatchKernelCodegenInput.emitInputCasts) emits unchecked Java casts to specific Arrow vector classes (BigIntVector,TimeStampMicroTZVector, ...) chosen from the catalystdataTypeof each boundAttribute. If a drifted batch reaches a codegen kernel, the cast throwsClassCastExceptionon the first batch. Today this is masked because codegen reads JVM-side vectors that already went through an FFI deep-copy +ScanExeccast; FFI-reduced pipelines that feed codegen directly would need the same alignment step at the JVM/native boundary feeding the kernel.