feat: Add immediate mode option for native shuffle#3845
feat: Add immediate mode option for native shuffle#3845andygrove wants to merge 6 commits intoapache:mainfrom
Conversation
|
@Kontinuation fyi |
|
@milenkovicm you may be interested in this since it could also be applied to Ballista |
|
thanks @andygrove will have a look, |
milenkovicm
left a comment
There was a problem hiding this comment.
- written blocks are not ordered by partition, am i correct (perhaps documentation about format of data file and index file could be added)
- at the moment each written block will have schema definition included, would it be possible to have a "specialised" stream writer which does not write schema (as it is same for all blocks) ?
- can spill write to result file instead of temporary file ?
The final file contains data ordered by partition. I will improve the docs.
It should be possible to move to the Arrow IPC Stream approach where schema is written once per partition. I have experimented with this in the past but it is quite a big change.
Yes, if there is enough memory. We only spill to the temp files if the memory pool rejects a request to try_grow. |
Kontinuation
left a comment
There was a problem hiding this comment.
If I understand it correctly, this is quite similar to what we did before switching to interleave_batch-based repartitioning.
One concern is the memory bloat problem problem when the number of partitions is large. It is quite common to repartition the data into 1000s of partitions when working with large datasets in Spark. The memory reserved by the builder would consume lots of memory even before we start to append data into them. That was the motivation for me to implement the interleave_batch-based approach. See also #887 for details.
The immediate mode works better for small number of partitions than interleave_batch-based approach, so I think it is a great to have feature. I would suggest that we invest in implementing sort-based repartitioning to scale better for large number of partitions and better performance.
| match &self.partitioning { | ||
| CometPartitioning::Hash(exprs, num_output_partitions) => { | ||
| let num_output_partitions = *num_output_partitions; | ||
| let arrays = exprs | ||
| .iter() | ||
| .map(|expr| expr.evaluate(batch)?.into_array(num_rows)) | ||
| .collect::<Result<Vec<_>>>()?; | ||
| let hashes_buf = &mut self.hashes_buf[..num_rows]; | ||
| hashes_buf.fill(42_u32); | ||
| create_murmur3_hashes(&arrays, hashes_buf)?; | ||
| let partition_ids = &mut self.partition_ids[..num_rows]; | ||
| for (idx, hash) in hashes_buf.iter().enumerate() { | ||
| partition_ids[idx] = | ||
| comet_partitioning::pmod(*hash, num_output_partitions) as u32; | ||
| } | ||
| Ok(num_output_partitions) | ||
| } | ||
| CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => { | ||
| let num_output_partitions = *num_output_partitions; | ||
| let max_hash_columns = *max_hash_columns; | ||
| let num_columns_to_hash = if max_hash_columns == 0 { | ||
| batch.num_columns() | ||
| } else { | ||
| max_hash_columns.min(batch.num_columns()) | ||
| }; | ||
| let columns_to_hash: Vec<ArrayRef> = (0..num_columns_to_hash) | ||
| .map(|i| Arc::clone(batch.column(i))) | ||
| .collect(); | ||
| let hashes_buf = &mut self.hashes_buf[..num_rows]; | ||
| hashes_buf.fill(42_u32); | ||
| create_murmur3_hashes(&columns_to_hash, hashes_buf)?; | ||
| let partition_ids = &mut self.partition_ids[..num_rows]; | ||
| for (idx, hash) in hashes_buf.iter().enumerate() { | ||
| partition_ids[idx] = | ||
| comet_partitioning::pmod(*hash, num_output_partitions) as u32; | ||
| } | ||
| Ok(num_output_partitions) | ||
| } | ||
| CometPartitioning::RangePartitioning( | ||
| lex_ordering, | ||
| num_output_partitions, | ||
| row_converter, | ||
| bounds, | ||
| ) => { | ||
| let num_output_partitions = *num_output_partitions; | ||
| let arrays = lex_ordering | ||
| .iter() | ||
| .map(|expr| expr.expr.evaluate(batch)?.into_array(num_rows)) | ||
| .collect::<Result<Vec<_>>>()?; | ||
| let row_batch = row_converter.convert_columns(arrays.as_slice())?; | ||
| let partition_ids = &mut self.partition_ids[..num_rows]; | ||
| for (row_idx, row) in row_batch.iter().enumerate() { | ||
| partition_ids[row_idx] = bounds | ||
| .as_slice() | ||
| .partition_point(|bound| bound.row() <= row) | ||
| as u32; | ||
| } | ||
| Ok(num_output_partitions) | ||
| } | ||
| other => Err(DataFusionError::NotImplemented(format!( | ||
| "Unsupported shuffle partitioning scheme {other:?}" | ||
| ))), | ||
| } |
There was a problem hiding this comment.
I suggest that we move partition ID computation to a separate utility to avoid repeating the same logic in multi partition mode and immediate mode.
| "Failed to open spill file for reading: {e}" | ||
| )) | ||
| })?; | ||
| let mut write_timer = self.metrics.write_time.timer(); |
There was a problem hiding this comment.
The write_timer seems to only count the time spent merging spill files. Is that enough?
| 3. **Partitioning**: `ShuffleWriterExec` receives batches and routes to the appropriate partitioner | ||
| based on the `partitionerMode` configuration: | ||
| - **Immediate mode** (`ImmediateModePartitioner`): For hash/range/round-robin partitioning. | ||
| As each batch arrives, rows are scattered into per-partition Arrow array builders. When a |
There was a problem hiding this comment.
When a partition's builder reaches the target batch size, it is flushed as a compressed Arrow IPC block to an in-memory buffer.
The current IPC writer uses block compression (compressing each batch), which may lead to poor compression ratios. In gluten, the shuffle writer first serializes and buffers the batches, then performs streming compression during eviction, achieving better compression ratios. I'm not entirely sure which is better.
There was a problem hiding this comment.
Thanks. I've been experimenting with switching to IPC stream approach. I have a draft PR open for this, but it is not fully working yet.
Thanks. This is good feedback. I will update the tuning guide to explain these trade offs between the two approaches. |
503cf6f to
2c493d6
Compare
Add ImmediateModePartitioner that partitions incoming batches immediately using per-partition builders, flushing compressed IPC blocks when they reach target batch size. This reduces memory overhead compared to the buffered approach that stores all uncompressed rows before writing. Includes documentation and config option (spark.comet.exec.shuffle.partitionerMode). Default is buffered.
2c493d6 to
9750b26
Compare
Address PR review feedback: - Extract hash and range partition ID assignment into shared partition_id module, removing duplication between immediate and buffered partitioners - Fix docs table to show buffered as default mode
Which issue does this PR close?
Closes #3855
Rationale for this change
The current
MultiPartitionShuffleRepartitionertries to buffer all input batches in memory before writing partitioned output duringshuffle_write. This is relatively efficient, becauseinterleave_record_batchescan create full size output batches, avoiding the need to coalesce later. However, this uses a lot of memory.This PR introduces a new
ImmediateModePartitioner, which takes a different approach: it partitions incoming batches immediately and uses re-usable builders per output partition. These builders get flushed when they reach the target batch size. The batches then get encoded and compressed and these compressed batches are buffered instead of the uncompressed incoming batches, reducing memory overhead and reducing spilling.Note that the default shuffle is still
buffered. I plan on creating a separate PR to change the default once this has had more testing. All CI tests did pass with the default asimmediate.What changes are included in this PR?
buffered)Benchmark Results
I used variations of the following command to run the benchmarks on macOS (M3 Ultra).
Memory usage
Throughput
TPC
I ran benchmarks with TPC-H @ 1TB in AWS and saw no regressions
How are these changes tested?