Skip to content

feat(spark): port Spark-compatible Parquet schema adapter from Comet#22341

Open
andygrove wants to merge 2 commits into
apache:mainfrom
andygrove:spark-parquet-schema-adapter
Open

feat(spark): port Spark-compatible Parquet schema adapter from Comet#22341
andygrove wants to merge 2 commits into
apache:mainfrom
andygrove:spark-parquet-schema-adapter

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Rationale for this change

Issue #22339 asks for a Spark-compatible Parquet reader in the datafusion-spark crate, based on functionality that already exists in apache/datafusion-comet. The schema adapter is the central piece of that reader — it rewrites physical expressions at planning time so that column references against the logical (query) schema resolve correctly to the physical (file) schema while preserving Spark's vectorized-reader semantics.

This PR ports that schema adapter into datafusion-spark so DataFusion users can read Parquet files with Spark semantics by plugging a SparkPhysicalExprAdapterFactory into a FileScanConfig.

What changes are included in this PR?

A new parquet feature on datafusion-spark (off by default), exposing:

  • SparkPhysicalExprAdapterFactory / SparkPhysicalExprAdapter — implements PhysicalExprAdapterFactory for use via FileScanConfigBuilder::with_expr_adapter.
  • SparkParquetOptions with all the version-sensitive flags (allow_type_promotion, return_null_struct_if_all_fields_missing, case_sensitive, use_field_id, ignore_missing_field_id, etc.) and an EvalMode enum.
  • spark_parquet_convert — Spark-compatible nested struct/list/map adaptation, INT96 timezone reinterpret, FixedSizeBinary(16) → UUID rendering.
  • SparkCastColumnExprPhysicalExpr for column-level type adaptation (timestamp micros → millis, nested field-name relabel, fallback to spark_parquet_convert).
  • RejectOnNonEmpty — defers type-promotion rejection to runtime so empty Parquet files still pass (SPARK-26709).
  • ParquetSchemaError — the four Parquet-relevant error variants from Comet's SparkError (SchemaConvert, MissingFieldIds, DuplicateFieldByFieldId, DuplicateFieldCaseInsensitive).

The full set of Spark vectorized-reader rejection rules is ported, including:

  • BINARY column rejection rules (no int → string, no binary → decimal without DecimalLogicalTypeAnnotation).
  • Decimal-to-decimal narrowing (isDecimalTypeMatched).
  • Integer-to-decimal narrowing (canReadAsDecimal).
  • Configurable type-promotion rejection (Spark 3.x rejects INT32→INT64 etc; Spark 4.x allows them).
  • Same-Spark-version rejections (long → narrower int, float → int, etc).
  • Scalar/complex type-shape mismatch (SPARK-45604).
  • Field-id and case-insensitive matching with duplicate detection.

Per-Spark-version behavior

Documented in parquet/mod.rs. Configured via SparkParquetOptions flags:

  • allow_type_promotion — Spark 3.x rejects INT32→INT64 / FLOAT→DOUBLE / INT32→DOUBLE; Spark 4.x allows.
  • return_null_struct_if_all_fields_missing — flips at SPARK-53535 (Spark 4.1+).
  • eval_mode — Spark 4.0 made Ansi the default.
  • TimestampNTZ is supported in the schema mapping.

Simplified vs Comet (deliberately)

  • Primitive scalar casts stay as DataFusion's CastExpr rather than Comet's full Spark-compatible Cast PhysicalExpr. Spark's rejection rules still apply at the schema-adapter level; only the cast kernel itself is DataFusion's. Adding a Spark-specific Cast PhysicalExpr to datafusion-spark is a separate task.
  • Error taxonomy: only the four Parquet-relevant SparkError variants are ported. Comet's full SparkError enum exists for its JNI bridge to the Spark JVM, which is not relevant here.
  • HDFS / object-store cache / JSON error serialization: not ported (Comet-specific concerns).

Are these changes tested?

Yes — 27 new tests, all passing (5 unit + 22 integration tests that round-trip Parquet through DataSourceExec). The integration tests cover every rejection rule that has a corresponding test in Comet's schema_adapter.rs:

  • BINARY-as-non-string/binary, string-as-int, binary-as-decimal
  • Int32-as-narrow-decimal (rejected) / Int32-as-wide-decimal (allowed)
  • Int64-as-narrow-decimal, decimal precision/int-precision narrowing
  • Decimal widening (allowed sanity check)
  • INT64 → narrower int / float, float → int, double → float, int → float
  • INT32 / INT64 → date / timestamp without annotations
  • Date → Timestamp(LTZ), Timestamp → Date
  • Empty file with disallowed widening (SPARK-26709 — passes)
  • Non-empty file with disallowed widening (rejected at runtime)
  • Unsigned-int round-trip (Iceberg compatibility)
  • Case-insensitive duplicate field detection

Total cargo test -p datafusion-spark --features "core parquet" --lib: 253 passed, 0 failed.

Are there any user-facing changes?

Yes — new public API behind the parquet feature on datafusion-spark:

use datafusion_spark::parquet::{
    EvalMode, SparkParquetOptions, SparkPhysicalExprAdapterFactory,
};

let mut options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
options.allow_type_promotion = false; // Spark 3.x

let factory: Arc<dyn PhysicalExprAdapterFactory> =
    Arc::new(SparkPhysicalExprAdapterFactory::new(options, None));

// Plug into FileScanConfigBuilder::with_expr_adapter

No changes to existing APIs; the new module is gated behind the optional parquet feature.

🤖 Generated with Claude Code

Adds a `parquet` feature to `datafusion-spark` exposing a
`SparkPhysicalExprAdapterFactory` that mirrors Apache Spark's
vectorized Parquet reader semantics. Faithful port of
`apache/datafusion-comet`'s schema adapter on apache/main.

Closes apache#22339

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@github-actions github-actions Bot added the spark label May 18, 2026
@andygrove
Copy link
Copy Markdown
Member Author

@shehabgamin fyi

@andygrove andygrove marked this pull request as ready for review May 18, 2026 16:49
- Short-circuit duplicate-field tree walk via Err propagation.
- Cache `types_match` on `SparkCastColumnExpr` to skip per-batch DataType compare.
- Drop redundant `Option<SchemaRef>` for `original_physical_schema` (gate on `case_sensitive` directly).
- Pre-build case-folded HashSet of physical names for O(1) `is_missing` lookup
  in `replace_missing_with_defaults` (was O(d × n)).
- Replace `O(n × m)` `eq_ignore_ascii_case` scan in `remap_physical_schema`
  with `HashSet<String>` of pre-lowercased names.
- Drop micros→millis specialization in `SparkCastColumnExpr::evaluate`;
  arrow's cast handles it.
- Dedupe `parse_field_id`/`field_id` between `schema_adapter.rs` and
  `parquet_support.rs`.
- Drop unnecessary `array.data_type().clone()` in `parquet_convert_array`.
- Reduce `remap_physical_schema` parameter list (3 bools → `&SparkParquetOptions`).
- Extract `find_field` and `rename_field` helpers; collapse 3x case-sensitive
  lookup duplication and 2x field-rename block duplication.
- Reduce test boilerplate: extract `execute_with_factory` helper used by all
  3 odd-out tests; `temp_parquet_path()` returns `String` directly.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Spark-compatible Parquet reader

1 participant