diff --git a/docs-mintlify/reference/configuration/environment-variables.mdx b/docs-mintlify/reference/configuration/environment-variables.mdx index d90e2fdea7651..b618c116ed00a 100644 --- a/docs-mintlify/reference/configuration/environment-variables.mdx +++ b/docs-mintlify/reference/configuration/environment-variables.mdx @@ -116,8 +116,8 @@ If you'd like to adjust the concurrency for the refresh worker, use ## `CUBEJS_REFRESH_WORKER_CONCURRENCY` -The number of concurrent connections each query queue of the refresh worker -has to the database. +The number of pre-aggregation queries the refresh worker executes concurrently +against the source database. | Possible Values | Default in Development | Default in Production | | --------------- | ---------------------------- | ---------------------------- | diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index e96753f27c589..34c2ef0a7e3a1 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -4489,6 +4489,7 @@ export class BaseQuery { ') AS {{ from_alias }}{% elif from_prepared %}\n' + 'FROM {{ from_prepared }}' + '{% endif %}' + + '{% for join in joins %}\n{{ join }}{% endfor %}' + '{% if filter %}\nWHERE {{ filter }}{% endif %}' + '{% if group_by %}\nGROUP BY {{ group_by }}{% endif %}' + '{% if having %}\nHAVING {{ having }}{% endif %}' + diff --git a/packages/cubejs-schema-compiler/src/adapter/PrestodbQuery.ts b/packages/cubejs-schema-compiler/src/adapter/PrestodbQuery.ts index ef6f1548ff5c6..193f95aee08d6 100644 --- a/packages/cubejs-schema-compiler/src/adapter/PrestodbQuery.ts +++ b/packages/cubejs-schema-compiler/src/adapter/PrestodbQuery.ts @@ -148,6 +148,7 @@ export class PrestodbQuery extends BaseQuery { 'FROM (\n {{ from }}\n) AS {{ from_alias }} {% elif from_prepared %}\n' + 'FROM {{ from_prepared }}' + '{% endif %}' + + '{% for join in joins %}\n{{ join }}{% endfor %}' + '{% if filter %}\nWHERE {{ filter }}{% endif %}' + '{% if group_by %} GROUP BY {{ group_by }}{% endif %}' + '{% if having %}\nHAVING {{ having }}{% endif %}' + diff --git a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs index c30614fb78553..eed8710df66a0 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs @@ -44,6 +44,7 @@ use std::{ convert::TryInto, fmt, future::Future, + mem::take, pin::Pin, result, sync::{Arc, LazyLock}, @@ -345,6 +346,14 @@ fn expr_name(e: &Expr, schema: &DFSchema) -> Result { } } +/// Holds information on whether a column name has been referenced once or multiple times +/// with different values; this is useful to resolve conflicts when remapping columns +/// with non-matching qualifiers. +enum ColumnReferenceState { + Single(Column), + Multiple, +} + /// Holds column remapping for generated SQL /// Can be used to remap expression in logical plans on top, /// and to generate mapping between schema and Cube load query in wrapper @@ -381,7 +390,47 @@ impl ColumnRemapping { } pub fn extend(&mut self, other: ColumnRemapping) { - self.column_remapping.extend(other.column_remapping); + // Let's collect the map again, obtaining conflicting column names and merging on the fly. + // We will need this to remove conflicting unqualified mappings when merging. + let mut conflicting_columns = HashMap::new(); + let old_column_remapping = take(&mut self.column_remapping); + + for (from, to) in old_column_remapping + .into_iter() + .chain(other.column_remapping) + { + let key = from.name.to_string(); + match conflicting_columns.get(&key) { + None => { + // No conflicting columns, keep current value and mark as single reference + conflicting_columns.insert(key, ColumnReferenceState::Single(to.clone())); + self.column_remapping.insert(from, to); + } + Some(ColumnReferenceState::Single(single_to)) => { + if single_to == &to { + // Same target column, no conflict + self.column_remapping.insert(from, to); + continue; + } + + // Target is different, so we have a conflict now + conflicting_columns.insert(key, ColumnReferenceState::Multiple); + // Remove unqualified mapping as it's now conflicting + self.column_remapping + .remove(&Column::from_name(from.name.clone())); + // Keep current mapping if it's qualified + if from.relation.is_some() { + self.column_remapping.insert(from, to); + } + } + Some(ColumnReferenceState::Multiple) => { + // Already marked as multiple, insert current value only if it's qualified + if from.relation.is_some() { + self.column_remapping.insert(from, to); + } + } + } + } } } @@ -392,6 +441,7 @@ struct Remapper { can_rename_columns: bool, remapping: HashMap, used_targets: HashSet, + used_columns: HashMap, } impl Remapper { @@ -407,6 +457,7 @@ impl Remapper { remapping: HashMap::new(), used_targets: HashSet::new(), + used_columns: HashMap::new(), } } @@ -462,11 +513,38 @@ impl Remapper { }; self.used_targets.insert(new_alias.clone()); - self.remapping.insert( - Column::from_name(&original_column.name), - target_column.clone(), - ); + let unqualified_column = Column::from_name(&original_column.name); + let reference_state = self.used_columns.get(&original_column.name); + match reference_state { + None => { + // No columns with this name yet, mark as single reference + self.remapping + .insert(unqualified_column, target_column.clone()); + self.used_columns.insert( + original_column.name.clone(), + ColumnReferenceState::Single(original_column.clone()), + ); + } + Some(ColumnReferenceState::Single(ref_column)) => { + // Already have a single reference, check if the target is the same + // and remove unqualified variant if not + if ref_column != &target_column { + self.remapping.remove(&unqualified_column); + self.used_columns + .insert(original_column.name.clone(), ColumnReferenceState::Multiple); + } + } + Some(ColumnReferenceState::Multiple) => { + // Already marked as multiple, nothing to do + } + } + if let Some(from_alias) = &self.from_alias { + // Always map the column under the new from_alias so plans above (which see + // this remapper's output through `from_alias`) can resolve it. When the + // source column already had a different relation, also keep a mapping under + // that original relation, so plans that still reference the inner qualifier + // continue to resolve. self.remapping.insert( Column { name: original_column.name.clone(), @@ -506,7 +584,11 @@ impl Remapper { expr: &Expr, ) -> result::Result { let original_alias = expr_name(original_expr, schema)?; - let original_alias_key = Column::from_name(&original_alias); + let original_alias_key = if let Expr::Column(column) = &original_expr { + column.clone() + } else { + Column::from_name(&original_alias) + }; if let Some(alias_column) = self.remapping.get(&original_alias_key) { return Ok(alias_column.name.clone()); } @@ -587,6 +669,7 @@ macro_rules! generate_sql_for_timestamp { }; } +#[derive(Debug)] struct GeneratedColumns { projection: Vec<(AliasedColumn, HashSet)>, group_by: Vec<(AliasedColumn, HashSet)>, @@ -881,6 +964,7 @@ impl CubeScanWrapperNode { .get_sql_templates() .select( new_sql.sql.to_string(), + vec![], columns, vec![], vec![], @@ -3621,7 +3705,7 @@ impl WrappedSelectNode { let SqlGenerationResult { data_source, from_alias, - column_remapping, + mut column_remapping, mut sql, request, } = CubeScanWrapperNode::generate_sql_for_node_rec( @@ -3636,6 +3720,91 @@ impl WrappedSelectNode { ) .await?; + let mut joins_sql = Vec::with_capacity(self.joins.len()); + for (join_plan, join_condition, join_type) in &self.joins { + let SqlGenerationResult { + data_source, + from_alias, + column_remapping: join_column_remapping, + sql: join_sql, + request: _, + } = CubeScanWrapperNode::generate_sql_for_node_rec( + meta, + Arc::clone(&transport), + Arc::clone(&load_request_meta), + Arc::clone(&state), + Arc::clone(join_plan), + true, + values.clone(), + parent_data_source, + ) + .await?; + + let Some(from_alias) = from_alias else { + return Err(CubeError::internal( + "Can't generate SQL for wrapped select: join subquery has no alias".to_string(), + )); + }; + + let Some(data_source) = data_source else { + return Err(CubeError::internal(format!( + "Can't generate SQL for wrapped select: no data source for {:?}", + node + ))); + }; + + let generator = Arc::clone( + meta.data_source_to_sql_generator + .get(&data_source) + .ok_or_else(|| { + CubeError::internal(format!( + "Can't generate SQL for wrapped select: no sql generator for {:?}", + node + )) + })?, + ); + + let join_type_sql = match join_type { + JoinType::Left => generator.get_sql_templates().left_join()?, + JoinType::Inner => generator.get_sql_templates().inner_join()?, + _ => { + return Err(CubeError::internal(format!( + "Unsupported join type for join subquery: {join_type:?}" + ))) + } + }; + + let (join_condition_sql, join_sql) = Self::generate_sql_for_expr( + join_sql, + generator.clone(), + join_condition.clone(), + None, + &HashMap::new(), + ) + .await?; + + let (join_sql_str, new_values) = join_sql.unpack(); + sql.extend_values(new_values); + if let Some(join_column_remapping) = join_column_remapping { + if let Some(column_remapping) = column_remapping.as_mut() { + column_remapping.extend(join_column_remapping); + } else { + column_remapping = Some(join_column_remapping); + } + }; + + let aliased_join_sql = generator + .get_sql_templates() + .query_aliased(&join_sql_str, &from_alias)?; + + let final_join_sql = generator.get_sql_templates().join( + &join_type_sql, + &aliased_join_sql, + &join_condition_sql, + )?; + joins_sql.push(final_join_sql); + } + let subqueries_sql = self .prepare_subqueries_sql( meta, @@ -3688,6 +3857,7 @@ impl WrappedSelectNode { .get_sql_templates() .select( sql.sql.to_string(), + joins_sql, projection.into_iter().map(|(m, _)| m).collect(), group_by.into_iter().map(|(m, _)| m).collect(), group_descs, diff --git a/rust/cubesql/cubesql/src/compile/mod.rs b/rust/cubesql/cubesql/src/compile/mod.rs index 2e7f84885c7f1..64f261c2f8e7b 100644 --- a/rust/cubesql/cubesql/src/compile/mod.rs +++ b/rust/cubesql/cubesql/src/compile/mod.rs @@ -18818,4 +18818,58 @@ LIMIT {{ limit }}{% endif %}"#.to_string(), Ok(()) } + + #[tokio::test] + async fn test_grouped_join_grouped_sql_push_down() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + r#" + WITH t1 AS ( + SELECT DISTINCT customer_gender + FROM KibanaSampleDataEcommerce + WHERE has_subscription = TRUE + ), + t2 AS ( + SELECT + customer_gender, + COUNT(DISTINCT id) AS user_count, + COUNT( + DISTINCT CASE + WHEN has_subscription = TRUE THEN id + END + ) AS subscribed_user_count + FROM KibanaSampleDataEcommerce + WHERE customer_gender IS NOT NULL + GROUP BY 1 + ) + SELECT + t1.customer_gender, + t2.user_count, + t2.subscribed_user_count + FROM + t1 AS t1, + t2 AS t2 + WHERE t1.customer_gender = t2.customer_gender + ORDER BY t2.user_count DESC + LIMIT 5000 + "# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + let logical_plan = query_plan.as_logical_plan(); + let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql; + assert!(sql.contains("INNER JOIN (")); + + let physical_plan = query_plan.as_physical_plan().await.unwrap(); + println!( + "Physical plan: {}", + displayable(physical_plan.as_ref()).indent() + ); + } } diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs index 179145266042e..71bdbda8ac473 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/join.rs @@ -10,7 +10,7 @@ use crate::{ wrapped_select_subqueries_empty_tail, wrapped_select_window_expr_empty_tail, wrapper_pullup_replacer, wrapper_pushdown_replacer, wrapper_replacer_context, BinaryExprOp, ColumnExprColumn, CubeEGraph, JoinLeftOn, JoinRightOn, LogicalPlanLanguage, - WrappedSelectJoinJoinType, WrapperReplacerContextAliasToCube, + WrappedSelectJoinJoinType, WrappedSelectPushToCube, WrapperReplacerContextAliasToCube, WrapperReplacerContextGroupedSubqueries, }, var, var_iter, var_list_iter, @@ -268,6 +268,225 @@ impl WrapperRules { "?out_grouped_subqueries", ), ), + // TODO handle CrossJoin and Filter(CrossJoin) as well + transforming_rewrite( + "wrapper-push-down-grouped-join-grouped", + join( + cube_scan_wrapper( + wrapper_pullup_replacer( + "?left_input", + wrapper_replacer_context( + // Going to use this in RHS of rule + "?left_alias_to_cube", + // Push-to-Cube can have any value when both sides are grouped + "?left_push_to_cube", + "?left_in_projection", + // Going to use this in RHS of rule + "?left_cube_members", + "?left_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + // Data sources must match for both sides + // TODO support unrestricted data source on one side + "?input_data_source", + ), + ), + "CubeScanWrapperFinalized:false", + ), + cube_scan_wrapper( + wrapper_pullup_replacer( + "?right_input", + wrapper_replacer_context( + // Going to ignore this + "?right_alias_to_cube", + "?right_push_to_cube", + "?right_in_projection", + // Going to ignore this + "?right_cube_members", + "?right_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + // Data sources must match for both sides + // TODO support unrestricted data source on one side + "?input_data_source", + ), + ), + "CubeScanWrapperFinalized:false", + ), + "?left_on", + "?right_on", + "?in_join_type", + "?join_constraint", + "JoinNullEqualsNull:false", + ), + // RHS is using WrapperReplacerContextInProjection:false because only part + // that should have push down replacer is join condition, and it should only contain dimensions + // Other way of thinking about it: join condition is more like filter than projection + cube_scan_wrapper( + wrapped_select( + "WrappedSelectSelectType:Projection", + wrapper_pullup_replacer( + wrapped_select_projection_expr_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "?left_push_to_cube", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + "?input_data_source", + ), + ), + wrapper_pullup_replacer( + wrapped_select_subqueries_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "?left_push_to_cube", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + "?input_data_source", + ), + ), + wrapper_pullup_replacer( + wrapped_select_group_expr_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "?left_push_to_cube", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + "?input_data_source", + ), + ), + wrapper_pullup_replacer( + wrapped_select_aggr_expr_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "?left_push_to_cube", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + "?input_data_source", + ), + ), + wrapper_pullup_replacer( + wrapped_select_window_expr_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "?left_push_to_cube", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + "?input_data_source", + ), + ), + wrapper_pullup_replacer( + // Can move left_input here without checking if it's CubeScan + // Check for WrapperReplacerContextUngroupedScan:true should be enough + "?left_input", + wrapper_replacer_context( + "?left_alias_to_cube", + "?left_push_to_cube", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + "?input_data_source", + ), + ), + // We don't want to use list rules here, because ?right_input is already done + wrapped_select_joins( + wrapped_select_join( + wrapper_pullup_replacer( + "?right_input", + wrapper_replacer_context( + "?left_alias_to_cube", + "?left_push_to_cube", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + "?input_data_source", + ), + ), + wrapper_pushdown_replacer( + "?out_join_expr", + wrapper_replacer_context( + "?left_alias_to_cube", + "?left_push_to_cube", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + "?input_data_source", + ), + ), + "?out_join_type", + ), + // pullup(tail) just so it could be easily picked up by pullup rules + wrapper_pullup_replacer( + wrapped_select_joins_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "?left_push_to_cube", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + "?input_data_source", + ), + ), + ), + wrapper_pullup_replacer( + wrapped_select_filter_expr_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "?left_push_to_cube", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + "?input_data_source", + ), + ), + wrapped_select_having_expr_empty_tail(), + "WrappedSelectLimit:None", + "WrappedSelectOffset:None", + wrapper_pullup_replacer( + wrapped_select_order_expr_empty_tail(), + wrapper_replacer_context( + "?left_alias_to_cube", + "?left_push_to_cube", + "WrapperReplacerContextInProjection:false", + "?left_cube_members", + "?out_grouped_subqueries", + "WrapperReplacerContextUngroupedScan:false", + "?input_data_source", + ), + ), + "WrappedSelectAlias:None", + "WrappedSelectDistinct:false", + // left push-to-Cube dictates the resulting push-to-Cube + "?out_push_to_cube", + // both inputs are grouped, so result is grouped as well + "WrappedSelectUngroupedScan:false", + ), + "CubeScanWrapperFinalized:false", + ), + self.transform_grouped_join_grouped( + "?left_on", + "?left_push_to_cube", + "?right_on", + "?in_join_type", + "?out_join_expr", + "?out_join_type", + "?out_grouped_subqueries", + "?out_push_to_cube", + ), + ), ]); // DataFusion plans complex join conditions as Filter(?join_condition, CrossJoin(...)) @@ -991,4 +1210,103 @@ impl WrapperRules { return false; } } + + fn transform_grouped_join_grouped( + &self, + left_on_var: &'static str, + left_push_to_cube_var: &'static str, + right_on_var: &'static str, + in_join_type_var: &'static str, + out_join_expr_var: &'static str, + out_join_type_var: &'static str, + out_grouped_subqueries_var: &'static str, + out_push_to_cube_var: &'static str, + ) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool { + let left_on_var = var!(left_on_var); + let left_push_to_cube_var = var!(left_push_to_cube_var); + + let right_on_var = var!(right_on_var); + + let in_join_type_var = var!(in_join_type_var); + + let out_join_expr_var = var!(out_join_expr_var); + let out_join_type_var = var!(out_join_type_var); + let out_grouped_subqueries_var = var!(out_grouped_subqueries_var); + let out_push_to_cube_var = var!(out_push_to_cube_var); + + move |egraph, subst| { + // We are going to generate join with grouped subquery + // TODO Do we have to check stuff like `transform_check_subquery_allowed` is checking: + // * Both inputs depend on a single data source + // * SQL generator for that data source have `expressions/subquery` template + // It could be checked later, in WrappedSelect as well + + for left_join_on in var_iter!(egraph[subst[left_on_var]], JoinLeftOn) { + for right_join_on in var_iter!(egraph[subst[right_on_var]], JoinRightOn) { + // Don't check left and right, they are already grouped + + for in_join_type in + var_list_iter!(egraph[subst[in_join_type_var]], JoinJoinType).cloned() + { + for left_push_to_cube in var_list_iter!( + egraph[subst[left_push_to_cube_var]], + WrapperReplacerContextPushToCube + ) + .cloned() + { + // TODO what's a proper way to find table expression alias? + let Some(right_join_alias) = right_join_on + .iter() + .filter_map(|c| c.relation.as_ref()) + .next() + .cloned() + else { + return false; + }; + + let Some(out_join_expr) = Self::build_join_expr( + egraph, + left_join_on.clone(), + right_join_on.clone(), + ) else { + return false; + }; + + // LHS is grouped, RHS is grouped + // Don't pass ungrouped queries from below, their qualifiers should not be accessible during join condition rewrite + let out_grouped_subqueries = vec![right_join_alias]; + + subst.insert(out_join_expr_var, out_join_expr); + subst.insert( + out_join_type_var, + egraph.add(LogicalPlanLanguage::WrappedSelectJoinJoinType( + WrappedSelectJoinJoinType(in_join_type.0), + )), + ); + subst.insert( + out_grouped_subqueries_var, + egraph.add( + LogicalPlanLanguage::WrapperReplacerContextGroupedSubqueries( + WrapperReplacerContextGroupedSubqueries( + out_grouped_subqueries, + ), + ), + ), + ); + subst.insert( + out_push_to_cube_var, + egraph.add(LogicalPlanLanguage::WrappedSelectPushToCube( + WrappedSelectPushToCube(left_push_to_cube.0), + )), + ); + + return true; + } + } + } + } + + return false; + } + } } diff --git a/rust/cubesql/cubesql/src/compile/test/mod.rs b/rust/cubesql/cubesql/src/compile/test/mod.rs index a5d01c7391492..75bd04812e53f 100644 --- a/rust/cubesql/cubesql/src/compile/test/mod.rs +++ b/rust/cubesql/cubesql/src/compile/test/mod.rs @@ -700,6 +700,7 @@ pub fn sql_generator( ("functions/LOWER".to_string(), "LOWER({{ args_concat }})".to_string()), ("functions/UPPER".to_string(), "UPPER({{ args_concat }})".to_string()), ("functions/PERCENTILECONT".to_string(), "PERCENTILE_CONT({{ args_concat }})".to_string()), + ("expressions/query_aliased".to_string(), "{{ query }} AS {{ quoted_alias }}".to_string()), ("expressions/extract".to_string(), "EXTRACT({{ date_part }} FROM {{ expr }})".to_string()), ( "statements/select".to_string(), @@ -708,13 +709,18 @@ pub fn sql_generator( {% if from %} FROM ( {{ from | indent(2) }} -) AS {{ from_alias }} {% endif %} {% if filter %} +) AS {{ from_alias }} {% endif %}{% for join in joins %} +{{ join }}{% endfor %}{% if filter %} WHERE {{ filter }}{% endif %}{% if group_by %} GROUP BY {{ group_by }}{% endif %}{% if order_by %} ORDER BY {{ order_by | map(attribute='expr') | join(', ') }}{% endif %}{% if limit is not none %} LIMIT {{ limit }}{% endif %}{% if offset is not none %} OFFSET {{ offset }}{% endif %}"#.to_string(), ), + ( + "statements/join".to_string(), + "{{ join_type }} JOIN {{ source }} ON {{ condition }}".to_string(), + ), ( "statements/group_by_exprs".to_string(), "{{ group_by | map(attribute='index') | join(', ') }}".to_string(), diff --git a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs index 94c28faf88220..a200c204a6968 100644 --- a/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs +++ b/rust/cubesql/cubesql/src/compile/test/test_wrapper.rs @@ -1193,6 +1193,75 @@ WHERE assert!(literal_re.is_match(&sql)); } +/// Regression test for smoke test "select __user and literal grouped under wrapper". +/// Inner CTE has unaliased DATE_TRUNC expressions; outer query references those columns +/// through the SubqueryAlias qualifier (`cube_scan_subq`). The CTE-level Remapper must +/// publish a mapping under the SubqueryAlias qualifier — otherwise the outer projection +/// cannot remap `cube_scan_subq.datetrunc(Utf8("day"),...)` to the short alias, and the +/// DataFusion-internal expression name leaks into the generated SQL. +#[tokio::test] +async fn test_single_cube_grouped_wrapper_with_join_field() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + // language=PostgreSQL + r#" +WITH +cube_scan_subq AS ( + SELECT + customer_gender AS my_gender, + DATE_TRUNC('month', order_date) AS my_order_month, + __user AS my_user, + 1 AS my_literal, + id, + DATE_TRUNC('day', order_date), + __cubeJoinField, + 2 + FROM KibanaSampleDataEcommerce + GROUP BY 1,2,3,4,5,6,7,8 +), +filter_subq AS ( + SELECT + customer_gender gender_filter + FROM KibanaSampleDataEcommerce + GROUP BY + gender_filter +) +SELECT + my_order_month, + my_gender, + my_user, + my_literal +FROM cube_scan_subq +WHERE + my_gender IN ( + SELECT + gender_filter + FROM filter_subq + ) +GROUP BY 1,2,3,4 +ORDER BY 1,2,3,4 +; +"# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + let _physical_plan = query_plan.as_physical_plan().await.unwrap(); + + let logical_plan = query_plan.as_logical_plan(); + let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql; + assert!( + !sql.contains("datetrunc(Utf8"), + "wrapped SQL leaked DataFusion-rendered expr name (e.g. datetrunc(Utf8(\"day\"),...)):\n{}", + sql + ); +} + /// Test that WrappedSelect(... limit=Some(0) ...) will render it correctly #[tokio::test] async fn test_wrapper_limit_zero() { diff --git a/rust/cubesql/cubesql/src/transport/service.rs b/rust/cubesql/cubesql/src/transport/service.rs index fb95c830b1fbd..364306be5298d 100644 --- a/rust/cubesql/cubesql/src/transport/service.rs +++ b/rust/cubesql/cubesql/src/transport/service.rs @@ -424,6 +424,7 @@ impl SqlTemplates { pub fn select( &self, from: String, + joins: Vec, projection: Vec, group_by: Vec, group_descs: Vec>, @@ -463,6 +464,7 @@ impl SqlTemplates { "statements/select", context! { from => from, + joins => joins, select_concat => select_concat, group_by => group_by_expr, aggregate => aggregate, @@ -993,4 +995,25 @@ impl SqlTemplates { pub fn inner_join(&self) -> Result { self.render_template("join_types/inner", context! {}) } + + pub fn query_aliased(&self, query: &str, alias: &str) -> Result { + let bracketed_query = format!("({})", query); + let quoted_alias = self.quote_identifier(alias)?; + self.render_template( + "expressions/query_aliased", + context! { query => bracketed_query, quoted_alias => quoted_alias }, + ) + } + + pub fn join( + &self, + join_type: &str, + source: &str, + condition: &str, + ) -> Result { + self.render_template( + "statements/join", + context! { join_type => join_type, source => source, condition => condition }, + ) + } }