feat: Add direct native shuffle execution optimization [experimental]#3230
feat: Add direct native shuffle execution optimization [experimental]#3230andygrove wants to merge 6 commits intoapache:mainfrom
Conversation
This PR introduces an experimental optimization that allows the native shuffle writer to directly execute the child native plan instead of reading intermediate batches via JNI. This avoids the JNI round-trip for single-source native plans. Current flow: Native Plan → ColumnarBatch → JNI → ScanExec → ShuffleWriterExec Optimized flow: Native Plan → (directly in native) → ShuffleWriterExec The optimization is: - Disabled by default (spark.comet.exec.shuffle.directNative.enabled=false) - Only applies to CometNativeShuffle (not columnar JVM shuffle) - Only applies to single-source native scans (CometNativeScanExec) - Does not apply to RangePartitioning (requires sampling) Changes: - CometShuffleDependency: Added childNativePlan field to pass native plan - CometShuffleExchangeExec: Added detection logic for single-source native plans - CometShuffleManager: Pass native plan to shuffle writer - CometNativeShuffleWriter: Use child native plan directly when available - CometConf: Added COMET_SHUFFLE_DIRECT_NATIVE_ENABLED config option - CometDirectNativeShuffleSuite: Comprehensive test suite with 15 tests Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3230 +/- ##
============================================
+ Coverage 56.12% 59.96% +3.83%
- Complexity 976 1433 +457
============================================
Files 119 170 +51
Lines 11743 15819 +4076
Branches 2251 2616 +365
============================================
+ Hits 6591 9486 +2895
- Misses 4012 5008 +996
- Partials 1140 1325 +185 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Subqueries (e.g., bloom filters with might_contain) are registered with the parent execution context ID. Direct native shuffle creates a new execution context with a different ID, causing subquery lookup to fail with "Subquery X not found for plan Y" errors. This change detects ScalarSubquery expressions in the child plan and falls back to the standard execution path when present. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
@viirya I'm curious what you think about this idea. Does it make sense? |
|
Looks like So this idea is to avoid the JVM/JNI overhead on the input. It sounds good. |
Resolve merge conflict in CometNativeShuffleWriter.scala by keeping the refactored inputOperator pattern and adding RoundRobinPartitioning support from main. Enable direct native shuffle by default. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@sqlbenchmark run tpch --iterations 3 |
Thanks for taking a look! I will keep experimenting with this. |
Comet TPC-H Benchmark ResultsBaseline: Query Times
Spark Configuration
Automated benchmark run by dfbench |
Summary
This PR introduces an experimental optimization that allows the native shuffle writer to directly execute the child native plan instead of reading intermediate batches via JNI. This avoids the JNI round-trip for single-source native plans.
Current flow:
Optimized flow:
Configuration
The optimization is controlled by a new config option:
spark.comet.exec.shuffle.directNative.enabled(default:false)Scope
The optimization currently applies when:
spark.comet.shuffle.mode=native)CometNativeScanExecRangePartitioning(which requires sampling)Changes
CometShuffleDependency.scalachildNativePlanfield to pass native plan to writerCometShuffleExchangeExec.scalaCometShuffleManager.scalaCometNativeShuffleWriter.scalaCometConf.scalaCOMET_SHUFFLE_DIRECT_NATIVE_ENABLEDconfig optionCometDirectNativeShuffleSuite.scalaTest plan
CometDirectNativeShuffleSuitewith 15 tests covering:CometNativeShuffleSuitetests still pass (16/16)🤖 Generated with Claude Code