Skip to content

feat: Add immediate mode option for native shuffle#3845

Open
andygrove wants to merge 6 commits intoapache:mainfrom
andygrove:immediate-mode-partitioner
Open

feat: Add immediate mode option for native shuffle#3845
andygrove wants to merge 6 commits intoapache:mainfrom
andygrove:immediate-mode-partitioner

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented Mar 30, 2026

Which issue does this PR close?

Closes #3855

Rationale for this change

The current MultiPartitionShuffleRepartitioner tries to buffer all input batches in memory before writing partitioned output during shuffle_write. This is relatively efficient, because interleave_record_batches can create full size output batches, avoiding the need to coalesce later. However, this uses a lot of memory.

This PR introduces a newImmediateModePartitioner, which takes a different approach: it partitions incoming batches immediately and uses re-usable builders per output partition. These builders get flushed when they reach the target batch size. The batches then get encoded and compressed and these compressed batches are buffered instead of the uncompressed incoming batches, reducing memory overhead and reducing spilling.

Note that the default shuffle is still buffered. I plan on creating a separate PR to change the default once this has had more testing. All CI tests did pass with the default as immediate.

shuffle-2

What changes are included in this PR?

  • New shuffle write implementation, and new config (defaults to buffered)

Benchmark Results

I used variations of the following command to run the benchmarks on macOS (M3 Ultra).

/usr/bin/time -l ./target/release/shuffle_bench --input /opt/tpch/sf100/lineitem --limit 100000000 --partitions 200 --codec lz4 --hash-columns 0,3 --memory-limit 2147000000 --mode immediate

Memory usage

shuffle_peak_memory

Throughput

shuffle_throughput

TPC

I ran benchmarks with TPC-H @ 1TB in AWS and saw no regressions

How are these changes tested?

  • All CI tests pass with immediate mode as the new default
  • New unit tests
  • I ran benchmarks with TPC-H @ 1TB in AWS and saw no regressions

@andygrove andygrove changed the title [EXPERIMENTAL] Add ImmediateModePartitioner for native shuffle feat: Add ImmediateModePartitioner for native shuffle [experimental] Mar 30, 2026
@andygrove andygrove changed the title feat: Add ImmediateModePartitioner for native shuffle [experimental] feat: Add immediate mode option for native shuffle [experimental] Mar 31, 2026
@andygrove andygrove marked this pull request as ready for review April 1, 2026 10:34
@andygrove andygrove changed the title feat: Add immediate mode option for native shuffle [experimental] feat: Add immediate mode option for native shuffle Apr 1, 2026
@andygrove
Copy link
Copy Markdown
Member Author

@Kontinuation fyi

@andygrove
Copy link
Copy Markdown
Member Author

@milenkovicm you may be interested in this since it could also be applied to Ballista

@milenkovicm
Copy link
Copy Markdown
Contributor

thanks @andygrove will have a look,

Copy link
Copy Markdown
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • written blocks are not ordered by partition, am i correct (perhaps documentation about format of data file and index file could be added)
  • at the moment each written block will have schema definition included, would it be possible to have a "specialised" stream writer which does not write schema (as it is same for all blocks) ?
  • can spill write to result file instead of temporary file ?

@andygrove
Copy link
Copy Markdown
Member Author

  • written blocks are not ordered by partition, am i correct (perhaps documentation about format of data file and index file could be added)

The final file contains data ordered by partition. I will improve the docs.

  • at the moment each written block will have schema definition included, would it be possible to have a "specialised" stream writer which does not write schema (as it is same for all blocks) ?

It should be possible to move to the Arrow IPC Stream approach where schema is written once per partition. I have experimented with this in the past but it is quite a big change.

  • can spill write to result file instead of temporary file ?

Yes, if there is enough memory. We only spill to the temp files if the memory pool rejects a request to try_grow.

Copy link
Copy Markdown
Member

@Kontinuation Kontinuation left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand it correctly, this is quite similar to what we did before switching to interleave_batch-based repartitioning.

One concern is the memory bloat problem problem when the number of partitions is large. It is quite common to repartition the data into 1000s of partitions when working with large datasets in Spark. The memory reserved by the builder would consume lots of memory even before we start to append data into them. That was the motivation for me to implement the interleave_batch-based approach. See also #887 for details.

The immediate mode works better for small number of partitions than interleave_batch-based approach, so I think it is a great to have feature. I would suggest that we invest in implementing sort-based repartitioning to scale better for large number of partitions and better performance.

Comment on lines +471 to +533
match &self.partitioning {
CometPartitioning::Hash(exprs, num_output_partitions) => {
let num_output_partitions = *num_output_partitions;
let arrays = exprs
.iter()
.map(|expr| expr.evaluate(batch)?.into_array(num_rows))
.collect::<Result<Vec<_>>>()?;
let hashes_buf = &mut self.hashes_buf[..num_rows];
hashes_buf.fill(42_u32);
create_murmur3_hashes(&arrays, hashes_buf)?;
let partition_ids = &mut self.partition_ids[..num_rows];
for (idx, hash) in hashes_buf.iter().enumerate() {
partition_ids[idx] =
comet_partitioning::pmod(*hash, num_output_partitions) as u32;
}
Ok(num_output_partitions)
}
CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => {
let num_output_partitions = *num_output_partitions;
let max_hash_columns = *max_hash_columns;
let num_columns_to_hash = if max_hash_columns == 0 {
batch.num_columns()
} else {
max_hash_columns.min(batch.num_columns())
};
let columns_to_hash: Vec<ArrayRef> = (0..num_columns_to_hash)
.map(|i| Arc::clone(batch.column(i)))
.collect();
let hashes_buf = &mut self.hashes_buf[..num_rows];
hashes_buf.fill(42_u32);
create_murmur3_hashes(&columns_to_hash, hashes_buf)?;
let partition_ids = &mut self.partition_ids[..num_rows];
for (idx, hash) in hashes_buf.iter().enumerate() {
partition_ids[idx] =
comet_partitioning::pmod(*hash, num_output_partitions) as u32;
}
Ok(num_output_partitions)
}
CometPartitioning::RangePartitioning(
lex_ordering,
num_output_partitions,
row_converter,
bounds,
) => {
let num_output_partitions = *num_output_partitions;
let arrays = lex_ordering
.iter()
.map(|expr| expr.expr.evaluate(batch)?.into_array(num_rows))
.collect::<Result<Vec<_>>>()?;
let row_batch = row_converter.convert_columns(arrays.as_slice())?;
let partition_ids = &mut self.partition_ids[..num_rows];
for (row_idx, row) in row_batch.iter().enumerate() {
partition_ids[row_idx] = bounds
.as_slice()
.partition_point(|bound| bound.row() <= row)
as u32;
}
Ok(num_output_partitions)
}
other => Err(DataFusionError::NotImplemented(format!(
"Unsupported shuffle partitioning scheme {other:?}"
))),
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest that we move partition ID computation to a separate utility to avoid repeating the same logic in multi partition mode and immediate mode.

"Failed to open spill file for reading: {e}"
))
})?;
let mut write_timer = self.metrics.write_time.timer();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write_timer seems to only count the time spent merging spill files. Is that enough?

3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner
based on the `partitionerMode` configuration:
- **Immediate mode** (`ImmediateModePartitioner`): For hash/range/round-robin partitioning.
As each batch arrives, rows are scattered into per-partition Arrow array builders. When a
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a partition's builder reaches the target batch size, it is flushed as a compressed Arrow IPC block to an in-memory buffer.

The current IPC writer uses block compression (compressing each batch), which may lead to poor compression ratios. In gluten, the shuffle writer first serializes and buffers the batches, then performs streming compression during eviction, achieving better compression ratios. I'm not entirely sure which is better.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I've been experimenting with switching to IPC stream approach. I have a draft PR open for this, but it is not fully working yet.

@andygrove
Copy link
Copy Markdown
Member Author

If I understand it correctly, this is quite similar to what we did before switching to interleave_batch-based repartitioning.

One concern is the memory bloat problem problem when the number of partitions is large. It is quite common to repartition the data into 1000s of partitions when working with large datasets in Spark. The memory reserved by the builder would consume lots of memory even before we start to append data into them. That was the motivation for me to implement the interleave_batch-based approach. See also #887 for details.

The immediate mode works better for small number of partitions than interleave_batch-based approach, so I think it is a great to have feature. I would suggest that we invest in implementing sort-based repartitioning to scale better for large number of partitions and better performance.

Thanks. This is good feedback. I will update the tuning guide to explain these trade offs between the two approaches.

@andygrove andygrove force-pushed the immediate-mode-partitioner branch from 503cf6f to 2c493d6 Compare April 8, 2026 15:49
Add ImmediateModePartitioner that partitions incoming batches immediately
using per-partition builders, flushing compressed IPC blocks when they
reach target batch size. This reduces memory overhead compared to the
buffered approach that stores all uncompressed rows before writing.

Includes documentation and config option
(spark.comet.exec.shuffle.partitionerMode). Default is buffered.
@andygrove andygrove force-pushed the immediate-mode-partitioner branch from 2c493d6 to 9750b26 Compare April 8, 2026 15:50
Address PR review feedback:
- Extract hash and range partition ID assignment into shared
  partition_id module, removing duplication between immediate
  and buffered partitioners
- Fix docs table to show buffered as default mode
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Experiment: immediate-mode shuffle

4 participants