feat: Create dynamic filters in SortMergeJoin#21267
feat: Create dynamic filters in SortMergeJoin#21267stuhood wants to merge 5 commits intoapache:mainfrom
Conversation
|
Supersedes #20455. |
ff61597 to
d7dcdd5
Compare
d7dcdd5 to
155f789
Compare
|
run benchmark tpch tpcds |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing stuhood.smj-dynamic-filter-creation (155f789) to ccaf802 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing stuhood.smj-dynamic-filter-creation (155f789) to ccaf802 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🚀🚀🚀 |
adriangb
left a comment
There was a problem hiding this comment.
Thank you for working on this. Can we split out the fixes for the other nodes into it's own PR that we can fast track (and maybe discuss any API improvements in)?
Then we can look at the SPM in more detail.
| if !matches!(phase, FilterPushdownPhase::Pre) { | ||
| let mut result = FilterPushdownPropagation::if_all(child_pushdown_result); | ||
| if let Some(updated_child) = result.updated_node { | ||
| let mut new_self = self.clone(); | ||
| new_self.input = updated_child; | ||
| result.updated_node = Some(Arc::new(new_self) as _); | ||
| } | ||
| return Ok(result); |
There was a problem hiding this comment.
Good catch! This is definitely a footgun. Any thoughts on how we can make the API less error prone? It would also be nice to put this into its own PR so we can fast track it.
| .iter() | ||
| .map(|f| PushedDownPredicate::unsupported(Arc::clone(f))) | ||
| .collect(), | ||
| parent_filters, |
| ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> { | ||
| let mut result = FilterPushdownPropagation::if_all(child_pushdown_result); | ||
| if let Some(updated_child) = result.updated_node { | ||
| result.updated_node = Some(Arc::new(GlobalLimitExec::new( |
There was a problem hiding this comment.
| result.updated_node = Some(Arc::new(GlobalLimitExec::new( | |
| result.updated_node = Some(Arc::new(Self::new( |
?
| let mut result = FilterPushdownPropagation::if_all(child_pushdown_result); | ||
| if let Some(updated_child) = result.updated_node { | ||
| result.updated_node = | ||
| Some(Arc::new(LocalLimitExec::new(updated_child, self.fetch)) as _); |
| let mut result = FilterPushdownPropagation::if_all(child_pushdown_result); | ||
| if let Some(updated_child) = result.updated_node { | ||
| let mut new_self = self.clone(); | ||
| new_self.input = updated_child; | ||
| result.updated_node = Some(Arc::new(new_self) as _); | ||
| } | ||
| Ok(result) |
| Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) | ||
| let mut result = FilterPushdownPropagation::if_all(child_pushdown_result); | ||
| if let Some(updated_child) = result.updated_node { | ||
| let mut new_self = self.clone(); | ||
| new_self.input = updated_child; | ||
| result.updated_node = Some(Arc::new(new_self) as _); | ||
| } | ||
| Ok(result) | ||
| } |
| fn handle_child_pushdown_result( | ||
| &self, | ||
| _phase: FilterPushdownPhase, | ||
| child_pushdown_result: ChildPushdownResult, | ||
| _config: &datafusion_common::config::ConfigOptions, | ||
| ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> { | ||
| let mut result = FilterPushdownPropagation::if_all(child_pushdown_result); | ||
| if let Some(updated_child) = result.updated_node { | ||
| let mut new_self = self.cloned(); | ||
| new_self.input = updated_child; | ||
| result.updated_node = Some(Arc::new(new_self) as _); | ||
| } | ||
| Ok(result) | ||
| } |
There was a problem hiding this comment.
Really need to eliminate this foot gun 🦶🏻
| Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) | ||
| let mut result = FilterPushdownPropagation::if_all(child_pushdown_result); | ||
| if let Some(updated_child) = result.updated_node { | ||
| let mut new_self = self.clone(); | ||
| new_self.input = updated_child; | ||
| result.updated_node = Some(Arc::new(new_self) as _); | ||
| } | ||
| Ok(result) |
Which issue does this PR close?
SortMergeJoinExec#20443.Rationale for this change
This change fixes #20443, and adds support for generating dynamic filters for the left and right sides of a
SortMergeJoin, in order to allow for range-based pruning of both sides of the join. Some consumers of the dynamic filter may even be able to seek/skip ahead on their inputs.What changes are included in this PR?
To allow
SortMergeJointo actually propagate dynamic filters down to scans and get things working end to end, it was necessary to fix thehandle_child_pushdown_resultimplementations of a variety of nodes which were failing to clone and update themselves.Are these changes tested?
Yes.
Are there any user-facing changes?
Explain will now show dynamic filters for
SortMergeJoin, and for the scans that they consume.