Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions src/Analyzer/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,84 @@ void resolveAggregateFunctionNodeByName(FunctionNode & function_node, const Stri
function_node.resolveAsAggregateFunction(std::move(aggregate_function));
}

namespace
{

/// Finalize __aliasMarker nodes right before distributed SQL boundaries.
/// This pass preserves nested markers and materializes arg2 to String constant
/// only when arg2 is ColumnNode.
class FinalizeAliasMarkersForDistributedSerializationVisitor : public InDepthQueryTreeVisitor<FinalizeAliasMarkersForDistributedSerializationVisitor>
{
public:
explicit FinalizeAliasMarkersForDistributedSerializationVisitor(ContextPtr context_)
: context(std::move(context_))
{}

bool shouldTraverseTopToBottom() const
{
return false;
}

static bool needChildVisit(const QueryTreeNodePtr &, const QueryTreeNodePtr &)
{
/// Keep traversing marker payload recursively so nested chains are preserved
/// and each marker can materialize its own arg2 when needed.
return true;
}

void visitImpl(QueryTreeNodePtr & node)
{
auto * function_node = node->as<FunctionNode>();
if (!function_node || function_node->getFunctionName() != "__aliasMarker")
return;

auto & arguments = function_node->getArguments().getNodes();
if (arguments.size() != 2 || !arguments[0] || !arguments[1])
return;

String alias_id;
if (const auto * marker_column_node = arguments[1]->as<ColumnNode>())
{
if (const auto & marker_source = marker_column_node->getColumnSourceOrNull();
marker_source && marker_source->hasAlias())
{
alias_id = marker_source->getAlias() + "." + marker_column_node->getColumnName();
}
else
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"__aliasMarker expects the second argument to resolve to a column with a source alias before distributed serialization. "
"Column '{}' has an unnamed or missing source",
marker_column_node->getColumnName());
}
}
else if (const auto * marker_id_node = arguments[1]->as<ConstantNode>();
marker_id_node && isString(marker_id_node->getResultType()))
{
/// Already materialized marker id from a previous hop. Keep as is.
return;
}

if (alias_id.empty())
return;

arguments[1] = std::make_shared<ConstantNode>(std::move(alias_id), std::make_shared<DataTypeString>());
resolveOrdinaryFunctionNodeByName(*function_node, "__aliasMarker", context);
}

private:
ContextPtr context;
};

}

void finalizeAliasMarkersForDistributedSerialization(QueryTreeNodePtr & node, const ContextPtr & context)
{
FinalizeAliasMarkersForDistributedSerializationVisitor visitor(context);
visitor.visit(node);
}

std::pair<QueryTreeNodePtr, bool> getExpressionSource(const QueryTreeNodePtr & node)
{
if (const auto * column = node->as<ColumnNode>())
Expand Down
4 changes: 4 additions & 0 deletions src/Analyzer/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ void resolveOrdinaryFunctionNodeByName(FunctionNode & function_node, const Strin
/// Arguments and parameters are taken from the node.
void resolveAggregateFunctionNodeByName(FunctionNode & function_node, const String & function_name);

/// Finalize __aliasMarker nodes before distributed SQL boundaries by materializing
/// marker ids in arg2 from ColumnNode to String ConstantNode when needed.
void finalizeAliasMarkersForDistributedSerialization(QueryTreeNodePtr & node, const ContextPtr & context);

/// Returns single source of expression node.
/// First element of pair is source node, can be nullptr if there are no sources or multiple sources.
/// Second element of pair is true if there is at most one source, false if there are multiple sources.
Expand Down
2 changes: 1 addition & 1 deletion src/Functions/identity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ REGISTER_FUNCTION(AliasMarker)
{
factory.registerFunction<FunctionAliasMarker>(FunctionDocumentation{
.description = R"(
Internal function that marks ALIAS column expressions for the analyzer. Not intended for direct use.
Internal function. Not for direct use.
)",
.syntax = {"__aliasMarker(expr, alias_name)"},
.arguments = {
Expand Down
68 changes: 64 additions & 4 deletions src/Functions/identity.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,68 @@ struct AliasMarkerName
static constexpr auto name = "__aliasMarker";
};

/**
* __aliasMarker is an internal function used to enforce an alias projection step in the plan exactly
* where it appears in a query received from the initiator.
*
* It allows the initiator to take better control over the aliases returned by shards, including cases
* where the final projection step is skipped due to the WithMergeableState stage. The main usage
* scenario is when the initiator injects an expression that must behave like a real column from the
* initiator's point of view. Namely, this happens after expanding an ALIAS column in a distributed
* table to its underlying expression.
*
* For example, if the initiator executes:
*
* SELECT foo AS bar FROM distr
*
* and `foo` is an ALIAS column such as `1 + x`, the remote query becomes:
*
* SELECT __aliasMarker(1 + x, 'table1.foo') AS bar FROM local AS table1
*
* This must not be confused with normal SQL aliases that appear in the query text: those participate
* in user-visible query semantics and may or may not be materialized depending on the execution stage.
* The user-facing SQL alias (`bar` in the example above) is separate and must stay untouched.
*
* A normal SQL alias cannot be used instead of __aliasMarker here because it may interfere with user
* query logic, clash with existing names, and in the mergeable-state path the final projection step
* that normally assigns aliases is intentionally skipped (see the conditional
* createComputeAliasColumnsStep(...) path in PlannerJoinTree::buildQueryPlanForTableExpression()).
*
* Preserving that identity is important because otherwise remote headers may diverge from initiator
* expectations, leading to header mismatches, incorrect column associations, or column-count
* mismatches.
*
* It slightly differs from the __actionName function (which is used for virtual column injection in
* engine=Merge), which only supports a constant string and survives as a normal function node with a
* forced result name, while __aliasMarker is completely removed from the query plan and supports any
* SQL expression as its first argument.
*
* The marker also prevents distinct logical columns with identical expressions from being merged
* into a single transport column. For example:
*
* SELECT 2 * x AS x, 2 * x AS y
*
* must still produce two columns; otherwise both expressions could collapse into a single
* `multiply(2, x)` output and break distributed header reconciliation.
*
* Lifecycle / invariants:
* 1) Injected around rewritten alias expressions that need stable transport identity, with a second
* argument pointing to the column in the query tree.
* 2) In later phases, some column manipulations and renames may happen (namely after
* createUniqueAliasesIfNecessary) before the column gets its final name.
* 3) After that, and before passing the query down to shards, the second argument of __aliasMarker
* gets "materialized": the column reference id is converted to a String identifier.
* 4) Consumed on the receiver by adding a projection step where it appears, so that identity is
* enforced in actions without changing the user-facing aliasing logic.
* 5) Preserved while forwarding to the next hop. Nested marker chains are allowed, and each marker
* may contribute an alias step during actions construction.
*
* This is a temporary bridge while distributed plan transport still relies on SQL text in these
* paths. As query plan serialization potentially fully replaces that boundary, this marker path may
* become unnecessary. However, to support the same behavior with serialize_query_plan, query plan
* modifications would still be required to control the names of those injected expressions.
*/

class FunctionAliasMarker : public IFunction
{
public:
Expand All @@ -110,7 +172,7 @@ class FunctionAliasMarker : public IFunction

String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 2; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {}; }
bool isSuitableForConstantFolding() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }

Expand All @@ -119,14 +181,12 @@ class FunctionAliasMarker : public IFunction
if (arguments.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function __aliasMarker expects 2 arguments");

if (!WhichDataType(arguments[1]).isString())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function __aliasMarker is internal and should not be used directly");

return arguments.front();
}

ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
// normally never executed, replaced with 1st arg during plan builing.
return arguments.front().column;
}
};
Expand Down
36 changes: 21 additions & 15 deletions src/Planner/PlannerActionsVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <Functions/indexHint.h>

#include <Interpreters/ExpressionActionsSettings.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/Context.h>
#include <Interpreters/Set.h>

Expand Down Expand Up @@ -88,6 +89,17 @@ String calculateActionNodeNameWithCastIfNeeded(const ConstantNode & constant_nod
return buffer.str();
}

String tryExtractAliasMarkerIdFromSecondArgument(const QueryTreeNodePtr & argument)
{
if (const auto * second_argument_constant = argument->as<ConstantNode>();
second_argument_constant && isString(second_argument_constant->getResultType()))
{
return second_argument_constant->getValue().safeGet<String>();
}

return {};
}

class ActionNodeNameHelper
{
public:
Expand Down Expand Up @@ -184,14 +196,12 @@ class ActionNodeNameHelper
{
/// Perform sanity check, because user may call this function with unexpected arguments
const auto & function_argument_nodes = function_node.getArguments().getNodes();
if (function_argument_nodes.size() == 2)
{
if (const auto * second_argument = function_argument_nodes.at(1)->as<ConstantNode>())
{
if (isString(second_argument->getResultType()))
result = second_argument->getValue().safeGet<String>();
}
}
if (function_argument_nodes.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function __aliasMarker expects 2 arguments");

result = tryExtractAliasMarkerIdFromSecondArgument(function_argument_nodes.at(1));
if (result.empty())
result = calculateActionNodeName(function_argument_nodes.at(0));

/// Empty node name is not allowed and leads to logical errors
if (result.empty())
Expand Down Expand Up @@ -1119,15 +1129,11 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi
if (function_arguments.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function __aliasMarker expects 2 arguments");

const auto * alias_id_node = function_arguments.at(1)->as<ConstantNode>();
if (!alias_id_node || !isString(alias_id_node->getResultType()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function __aliasMarker is internal and should not be used directly");

const auto & alias_id = alias_id_node->getValue().safeGet<String>();
auto [child_name, levels] = visitImpl(function_arguments.at(0));
auto alias_id = tryExtractAliasMarkerIdFromSecondArgument(function_arguments.at(1));
if (alias_id.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function __aliasMarker is internal and should not be used directly");
alias_id = child_name;

auto [child_name, levels] = visitImpl(function_arguments.at(0));
if (alias_id == child_name)
return {child_name, levels};

Expand Down
51 changes: 1 addition & 50 deletions src/Planner/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,62 +198,13 @@ ASTPtr queryNodeToSelectQuery(const QueryTreeNodePtr & query_node, bool set_subq
return result_ast;
}

namespace
{
class NormalizeAliasMarkerVisitor : public InDepthQueryTreeVisitor<NormalizeAliasMarkerVisitor>
{
public:
void visitImpl(QueryTreeNodePtr & node)
{
auto * function_node = node->as<FunctionNode>();
if (!function_node || function_node->getFunctionName() != "__aliasMarker")
return;

auto & arguments = function_node->getArguments().getNodes();
if (arguments.size() != 2)
return;

while (true)
{
auto * inner_function = arguments.front()->as<FunctionNode>();
if (!inner_function || inner_function->getFunctionName() != "__aliasMarker")
break;

auto & inner_arguments = inner_function->getArguments().getNodes();
if (inner_arguments.size() != 2)
break;

arguments.front() = inner_arguments.front();
}
}

bool needChildVisit(QueryTreeNodePtr & parent, QueryTreeNodePtr & child)
{
auto * parent_function = parent->as<FunctionNode>();
if (parent_function && parent_function->getFunctionName() == "__aliasMarker")
return false;

auto child_node_type = child->getNodeType();
return !(child_node_type == QueryTreeNodeType::QUERY || child_node_type == QueryTreeNodeType::UNION);
}
};

void normalizeAliasMarkersInQueryTree(QueryTreeNodePtr & node)
{
NormalizeAliasMarkerVisitor visitor;
visitor.visit(node);
}
}

ASTPtr queryNodeToDistributedSelectQuery(const QueryTreeNodePtr & query_node)
{
/// Remove CTEs information from distributed queries.
/// Now, if cte_name is set for subquery node, AST -> String serialization will only print cte name.
/// But CTE is defined only for top-level query part, so may not be sent.
/// Removing cte_name forces subquery to be always printed.
auto query_node_to_convert = query_node->clone();
normalizeAliasMarkersInQueryTree(query_node_to_convert);
auto ast = queryNodeToSelectQuery(query_node_to_convert, /*set_subquery_cte_name=*/false);
auto ast = queryNodeToSelectQuery(query_node, /*set_subquery_cte_name=*/false);
return ast;
}

Expand Down
Loading
Loading