-
Notifications
You must be signed in to change notification settings - Fork 2k
Make BatchPartitioner::partition_iter public for downstream async consumers #21311
Description
Is your feature request related to a problem or challenge?
BatchPartitioner::partition takes a sync FnMut closure, which means consumers that need to do I/O with the partitioned batches have to do it inline. In Ballista's shuffle writer, this blocks tokio worker threads because file I/O happens inside the closure.
The workaround is to move the entire partition call into spawn_blocking, but that also moves the CPU-bound partitioning work off the tokio workers, which is wasteful.
partition_iter already exists as a private method and returns an iterator over (partition_index, RecordBatch) pairs. DataFusion's own RepartitionExec uses it directly to iterate results and send them through async channels. The doc comment on the method says this separation was intentional:
"we need to have a variant of
partitionthat works w/ sync functions, and one that works w/ async. Using an iterator as an intermediate representation was the best way to achieve this"
But since partition_iter is private, downstream crates can't use this pattern.
Describe the solution you'd like
Make partition_iter public (or add a public equivalent). The signature is already suitable:
pub fn partition_iter(
&mut self,
batch: RecordBatch,
) -> Result<impl Iterator<Item = Result<(usize, RecordBatch)>> + Send + '_>This lets consumers partition on the async side and only push the I/O into spawn_blocking:
// async side
for (partition, batch) in partitioner.partition_iter(input_batch)? {
tx.send((partition, batch)).await?;
}
// blocking side
while let Some((partition, batch)) = rx.blocking_recv() {
writers[partition].write(&batch)?;
}Describe alternatives you've considered
In apache/datafusion-ballista#1537 we moved both partitioning and I/O into spawn_blocking together. It works, but it's leaving performance on the table by running CPU work on the blocking pool.
Additional context
Ballista PR: apache/datafusion-ballista#1537