Skip to content

DataFusion / DataFusion-Spark functions whose Arrow return type drifts from Spark catalyst's declared type #4515

@mbutrovich

Description

@mbutrovich

I'm running into this in multiple PRs that try to reduce the FFI deep copies in the system (#4393, #4507) so I need to start tracking it since I keep introducing workarounds on those branches.

Background

Comet serializes Spark catalyst types into a proto schema, and the native planner expects DataFusion physical expressions to produce arrays matching those Arrow types. Several DataFusion / datafusion-spark functions return arrays whose Arrow type does not match what Spark catalyst declares.

On main, this is masked because batches cross JVM-to-native FFI boundaries inside a stage. Each crossing deep-copies and re-advertises the stream schema, and the consuming ScanExec.build_record_batch casts to the declared schema. Together they reshape Timestamp(us) -> Timestamp(us, "UTC"), List(nullable Int32) -> List(non-null Int32), etc. before validation runs.

Reducing FFI hops removes both the deep-copy re-stamp and the scan cast, so the drift surfaces as Invalid argument error: column types must match schema types, expected ... but found .... PRs that hit this work around it with a Projection(Cast) and a warning per drifting column. The right fix is to correct the function return types upstream so the cast becomes a no-op.

Related issue

Observed mismatches

Spark expression Catalyst declares DataFusion / datafusion-spark produces
width_bucket LongType (Int64) Int32
date_trunc(unit, ts) Timestamp(us, "UTC") Timestamp(us) (no timezone)
collect_set(int) List(non-null Int32) List(nullable Int32)

Not exhaustive. Each warning emitted by the workaround cast layer on a branch that reduces FFI hops is a candidate to add here.

Other victims of the same drift

The shuffle writer is not the only consumer that assumes catalyst-aligned Arrow types. The codegen kernel input layer (CometBatchKernelCodegenInput.emitInputCasts) emits unchecked Java casts to specific Arrow vector classes (BigIntVector, TimeStampMicroTZVector, ...) chosen from the catalyst dataType of each bound Attribute. If a drifted batch reaches a codegen kernel, the cast throws ClassCastException on the first batch. Today this is masked because codegen reads JVM-side vectors that already went through an FFI deep-copy + ScanExec cast; FFI-reduced pipelines that feed codegen directly would need the same alignment step at the JVM/native boundary feeding the kernel.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions