Skip to content

Optimize window function with intra-operator parallelism and vectorization #22355

@2010YOUY01

Description

@2010YOUY01

Is your feature request related to a problem or challenge?

Background

For a refresher on window function definitions, see Ch.2 of the following paper:
https://www.vldb.org/pvldb/vol8/p1058-leis.pdf

Currently, DataFusion has two ExecutionPlan implementations for window functions. Both assume that repartitioning and ordering requirements are satisfied outside the window operator itself (the optimizer inserts RepartitionExec and SortExec on the input side of the window operator). They differ in execution strategy:

  • WindowAggExec: fallback implementation. It caches the entire input, identifies partition boundaries, then evaluates window partitions one by one.
  • BoundedWindowAggExec: memory-efficient variant. When the window frame is bounded, it only retains the batches required to evaluate the current window row.

Pain Points

Both existing implementations only support parallelism through repartitioning. This works well when the window expression includes PARTITION BY and the partition cardinality is high enough to utilize all CPU cores.

For queries like:

-- Only one global partition
SELECT
  v1,
  avg(v1) OVER (
    ORDER BY v1
    ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
  ) AS moving_avg
FROM generate_series(1000000000) AS t1(v1);

only a single CPU core performs the work.

Proposed Solution

For the above workload, which uses a simple aggregate function with a fixed ROWS window frame, a possible solution would be:

  • Introduce intra-operator parallelism: each worker evaluates a separate chunk of rows.
  • Improve memory efficiency by incrementally loading input batches.
  • Introduce a vectorized aggregate API: evaluate window expression columns in batches so there is no per-row function-call overhead when adjusting frame boundaries.
Image

Here is a PoC targeting the above workload

Implementation Plan

WIP, I'm still thinking about how to design the common operator state machine and window expr API. Here is the challenge:

Making a single workload fast is relatively straightforward. The hard part is designing a simple, general interface that are extensible for different optimizations.

Here are some different window expr types:

  • Window frames such as ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  • Aggregate functions such as max(x)
  • Dynamic frame expressions such as ROWS BETWEEN v1 PRECEDING AND v2 FOLLOWING

Most of these workloads can support intra-partition parallelism, but the implementation strategy differs significantly. For example, window frames with highly dynamic ranges may benefit from segment-tree-based approaches as described in the paper above.

I'm trying to prototype other workload types to figure out the right design, so that we could easily extend acceleration kernels for window exprs with different characteristics.

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions