Is your feature request related to a problem or challenge?
Background
For a refresher on window function definitions, see Ch.2 of the following paper:
https://www.vldb.org/pvldb/vol8/p1058-leis.pdf
Currently, DataFusion has two ExecutionPlan implementations for window functions. Both assume that repartitioning and ordering requirements are satisfied outside the window operator itself (the optimizer inserts RepartitionExec and SortExec on the input side of the window operator). They differ in execution strategy:
WindowAggExec: fallback implementation. It caches the entire input, identifies partition boundaries, then evaluates window partitions one by one.
BoundedWindowAggExec: memory-efficient variant. When the window frame is bounded, it only retains the batches required to evaluate the current window row.
Pain Points
Both existing implementations only support parallelism through repartitioning. This works well when the window expression includes PARTITION BY and the partition cardinality is high enough to utilize all CPU cores.
For queries like:
-- Only one global partition
SELECT
v1,
avg(v1) OVER (
ORDER BY v1
ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
) AS moving_avg
FROM generate_series(1000000000) AS t1(v1);
only a single CPU core performs the work.
Proposed Solution
For the above workload, which uses a simple aggregate function with a fixed ROWS window frame, a possible solution would be:
- Introduce intra-operator parallelism: each worker evaluates a separate chunk of rows.
- Improve memory efficiency by incrementally loading input batches.
- Introduce a vectorized aggregate API: evaluate window expression columns in batches so there is no per-row function-call overhead when adjusting frame boundaries.
Here is a PoC targeting the above workload
Implementation Plan
WIP, I'm still thinking about how to design the common operator state machine and window expr API. Here is the challenge:
Making a single workload fast is relatively straightforward. The hard part is designing a simple, general interface that are extensible for different optimizations.
Here are some different window expr types:
- Window frames such as
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
- Aggregate functions such as
max(x)
- Dynamic frame expressions such as
ROWS BETWEEN v1 PRECEDING AND v2 FOLLOWING
Most of these workloads can support intra-partition parallelism, but the implementation strategy differs significantly. For example, window frames with highly dynamic ranges may benefit from segment-tree-based approaches as described in the paper above.
I'm trying to prototype other workload types to figure out the right design, so that we could easily extend acceleration kernels for window exprs with different characteristics.
Describe the solution you'd like
No response
Describe alternatives you've considered
No response
Additional context
No response
Is your feature request related to a problem or challenge?
Background
For a refresher on window function definitions, see Ch.2 of the following paper:
https://www.vldb.org/pvldb/vol8/p1058-leis.pdf
Currently, DataFusion has two
ExecutionPlanimplementations for window functions. Both assume that repartitioning and ordering requirements are satisfied outside the window operator itself (the optimizer insertsRepartitionExecandSortExecon the input side of the window operator). They differ in execution strategy:WindowAggExec: fallback implementation. It caches the entire input, identifies partition boundaries, then evaluates window partitions one by one.BoundedWindowAggExec: memory-efficient variant. When the window frame is bounded, it only retains the batches required to evaluate the current window row.Pain Points
Both existing implementations only support parallelism through repartitioning. This works well when the window expression includes
PARTITION BYand the partition cardinality is high enough to utilize all CPU cores.For queries like:
only a single CPU core performs the work.
Proposed Solution
For the above workload, which uses a simple aggregate function with a fixed
ROWSwindow frame, a possible solution would be:Here is a PoC targeting the above workload
Implementation Plan
WIP, I'm still thinking about how to design the common operator state machine and window expr API. Here is the challenge:
Making a single workload fast is relatively straightforward. The hard part is designing a simple, general interface that are extensible for different optimizations.
Here are some different window expr types:
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROWmax(x)ROWS BETWEEN v1 PRECEDING AND v2 FOLLOWINGMost of these workloads can support intra-partition parallelism, but the implementation strategy differs significantly. For example, window frames with highly dynamic ranges may benefit from segment-tree-based approaches as described in the paper above.
I'm trying to prototype other workload types to figure out the right design, so that we could easily extend acceleration kernels for window exprs with different characteristics.
Describe the solution you'd like
No response
Describe alternatives you've considered
No response
Additional context
No response