diff --git a/src/Storages/HybridSegmentPruner.cpp b/src/Storages/HybridSegmentPruner.cpp new file mode 100644 index 000000000000..57b835df1774 --- /dev/null +++ b/src/Storages/HybridSegmentPruner.cpp @@ -0,0 +1,451 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace DB +{ + +namespace +{ + +/// Bounded DNF defaults: expand at most this many multi-disjunct OR groups, +/// and at most this many total branches. Anything beyond returns Keep. +constexpr size_t MAX_OR_GROUPS = 2; +constexpr size_t MAX_TOTAL_BRANCHES = 4; + +enum class CompareOp : uint8_t +{ + Less, + LessOrEqual, + Greater, + GreaterOrEqual, + Equals, +}; + +/// Per-column domain accumulator. `range` is the typed interval (closed/open ends, possibly +/// unbounded) tightened by each comparison atom. `allowed`, when set, constrains the column +/// to a finite set of typed values (introduced by `=` or `IN` atoms). +struct ColumnDomain +{ + Range range = Range::createWholeUniverse(); + std::optional> allowed; + bool empty = false; +}; + +/// Trim `allowed` against the current range and check overall emptiness. Returns true if the +/// domain is still satisfiable; sets `empty` and returns false if it's been narrowed to nothing. +bool finalizeDomain(ColumnDomain & domain) +{ + if (domain.empty || domain.range.empty()) + return !(domain.empty = true); + if (domain.allowed) + { + for (auto it = domain.allowed->begin(); it != domain.allowed->end();) + { + /// Use intersectsRange(Range(point)) rather than Range::contains(FieldRef) because the + /// Core/Range.cpp implementation has a buggy (effectively always-false) semantics. + if (domain.range.intersectsRange(Range(*it))) + ++it; + else + it = domain.allowed->erase(it); + } + if (domain.allowed->empty()) + return !(domain.empty = true); + } + return true; +} + +/// One typed atom extracted from a supported AST shape. +struct TypedAtom +{ + String column; + /// For comparisons: op + single value. For IN: op == Equals + values populated. + CompareOp op = CompareOp::Equals; + Field value; + std::vector values; /// Used only for IN. + bool is_in = false; +}; + +/// Look up Hybrid column type. Returns nullptr if the column is not part of the Hybrid schema. +DataTypePtr findColumnType(const NamesAndTypesList & cols, const String & name) +{ + for (const auto & c : cols) + if (c.name == name) + return c.type; + return nullptr; +} + +/// Collect top-level conjuncts, flattening nested `and(and(...), ...)`. +void collectConjuncts(const ASTPtr & ast, std::vector & out) +{ + if (!ast) + return; + if (const auto * func = ast->as(); func && func->name == "and" && func->arguments) + { + for (const auto & child : func->arguments->children) + collectConjuncts(child, out); + return; + } + out.push_back(ast); +} + +void collectDisjuncts(const ASTPtr & ast, std::vector & out) +{ + if (!ast) + return; + if (const auto * func = ast->as(); func && func->name == "or" && func->arguments) + { + for (const auto & child : func->arguments->children) + collectDisjuncts(child, out); + return; + } + out.push_back(ast); +} + +/// Try to extract a Field of the given type from an arbitrary AST expression by constant-folding. +/// Returns nullopt if the expression is not foldable or the result cannot be coerced to `target_type`. +std::optional evalAndCoerce( + const ASTPtr & ast, const IDataType & target_type, const ContextPtr & context) +{ + auto evaluated = tryEvaluateConstantExpression(ast, context); + if (!evaluated) + return {}; + if (!evaluated->second) + return {}; + return convertFieldToTypeStrict(evaluated->first, *evaluated->second, target_type, FormatSettings{}); +} + +/// If `ast` is a tuple-of-literals or constant-folds to one, return the typed elements. +std::optional> evalTupleAndCoerce( + const ASTPtr & ast, const IDataType & target_type, const ContextPtr & context) +{ + auto evaluated = tryEvaluateConstantExpression(ast, context); + if (!evaluated) + return {}; + if (evaluated->first.getType() != Field::Types::Tuple) + return {}; + const auto & tup = evaluated->first.safeGet(); + std::vector out; + out.reserve(tup.size()); + for (const auto & f : tup) + { + /// Each element's source type is the tuple's element type. We pass nullptr as + /// the from-type hint here; convertFieldToTypeStrict will conservatively reject + /// imprecise/lossy coercions, which is what we want. + Field coerced = convertFieldToType(f, target_type, nullptr, FormatSettings{}); + if (coerced.isNull() && !f.isNull()) + return {}; + out.push_back(std::move(coerced)); + } + return out; +} + +/// Extract a typed atom from a comparison/IN AST. `negated` reflects an outer `NOT`. +/// Returns nullopt for unsupported shapes (caller treats branch as satisfiable). +std::optional extractAtom( + ASTPtr ast, + bool negated, + const NamesAndTypesList & hybrid_cols, + const ContextPtr & context) +{ + /// Peel off outer `not` once. + if (const auto * func = ast->as(); func && func->name == "not") + { + if (!func->arguments || func->arguments->children.size() != 1) + return {}; + return extractAtom(func->arguments->children[0], !negated, hybrid_cols, context); + } + + const auto * func = ast->as(); + if (!func || !func->arguments || func->arguments->children.size() != 2) + return {}; + + String fname = func->name; + /// Effective op after applying `negated`. + auto invert = [](CompareOp op) -> std::optional + { + switch (op) + { + case CompareOp::Less: return CompareOp::GreaterOrEqual; + case CompareOp::LessOrEqual: return CompareOp::Greater; + case CompareOp::Greater: return CompareOp::LessOrEqual; + case CompareOp::GreaterOrEqual: return CompareOp::Less; + case CompareOp::Equals: return std::nullopt; /// `!=` is deferred. + } + return std::nullopt; + }; + + if (fname == "in") + { + if (negated) + return {}; /// NOT IN deferred. + + const auto & lhs = func->arguments->children[0]; + const auto & rhs = func->arguments->children[1]; + + const auto * ident = lhs->as(); + if (!ident) + return {}; + /// Use the unqualified column name. The analyzer rewrites bare `ts` to + /// `__table1.ts`; ownership-by-table is enforced separately by the + /// caller (which skips pruning when the query has a JOIN). + auto col_type = findColumnType(hybrid_cols, ident->shortName()); + if (!col_type) + return {}; + + auto values = evalTupleAndCoerce(rhs, *col_type, context); + if (!values) + return {}; + + TypedAtom atom; + atom.column = ident->shortName(); + atom.is_in = true; + atom.values = std::move(*values); + return atom; + } + + CompareOp op; + if (fname == "less") op = CompareOp::Less; + else if (fname == "lessOrEquals") op = CompareOp::LessOrEqual; + else if (fname == "greater") op = CompareOp::Greater; + else if (fname == "greaterOrEquals") op = CompareOp::GreaterOrEqual; + else if (fname == "equals") op = CompareOp::Equals; + else + return {}; /// Unsupported function (notEquals, like, etc.) + + if (negated) + { + auto inv = invert(op); + if (!inv) + return {}; + op = *inv; + } + + /// Identify which side is the column and which is the constant. + ASTPtr col_ast = func->arguments->children[0]; + ASTPtr val_ast = func->arguments->children[1]; + bool flipped = false; + const auto * col_ident = col_ast->as(); + if (!col_ident) + { + col_ident = val_ast->as(); + if (!col_ident) + return {}; + std::swap(col_ast, val_ast); + flipped = true; + } + auto col_type = findColumnType(hybrid_cols, col_ident->shortName()); + if (!col_type) + return {}; + + /// If we swapped sides, the comparison is reversed: `5 < x` ≡ `x > 5`. + if (flipped) + { + switch (op) + { + case CompareOp::Less: op = CompareOp::Greater; break; + case CompareOp::LessOrEqual: op = CompareOp::GreaterOrEqual; break; + case CompareOp::Greater: op = CompareOp::Less; break; + case CompareOp::GreaterOrEqual: op = CompareOp::LessOrEqual; break; + case CompareOp::Equals: break; + } + } + + auto coerced = evalAndCoerce(val_ast, *col_type, context); + if (!coerced) + return {}; + + TypedAtom atom; + atom.column = col_ident->shortName(); + atom.op = op; + atom.value = std::move(*coerced); + return atom; +} + +/// Apply an atom to the per-column domain map. Returns true if the branch can still +/// be satisfiable; false if the atom proves the branch unsatisfiable. +bool applyAtomToDomains( + std::unordered_map & domains, const TypedAtom & atom) +{ + auto & domain = domains[atom.column]; + if (domain.empty) + return false; + + if (atom.is_in) + { + std::set incoming(atom.values.begin(), atom.values.end()); + if (incoming.empty()) + return !(domain.empty = true); + if (!domain.allowed) + domain.allowed = std::move(incoming); + else + { + std::set intersection; + for (const auto & f : *domain.allowed) + if (incoming.contains(f)) + intersection.insert(f); + domain.allowed = std::move(intersection); + } + return finalizeDomain(domain); + } + + Range atom_range = Range::createWholeUniverse(); + switch (atom.op) + { + case CompareOp::Less: atom_range = Range::createRightBounded(atom.value, /*included*/ false); break; + case CompareOp::LessOrEqual: atom_range = Range::createRightBounded(atom.value, /*included*/ true); break; + case CompareOp::Greater: atom_range = Range::createLeftBounded(atom.value, /*included*/ false); break; + case CompareOp::GreaterOrEqual: atom_range = Range::createLeftBounded(atom.value, /*included*/ true); break; + case CompareOp::Equals: atom_range = Range(atom.value); break; + } + + auto narrowed = domain.range.intersectWith(atom_range); + if (!narrowed) + return !(domain.empty = true); + domain.range = std::move(*narrowed); + + if (atom.op == CompareOp::Equals) + { + if (!domain.allowed) + domain.allowed = std::set{atom.value}; + else if (!domain.allowed->contains(atom.value)) + return !(domain.empty = true); + else + domain.allowed = std::set{atom.value}; + } + + return finalizeDomain(domain); +} + +/// True if every atom in `branch` extracts to a supported typed atom AND the +/// per-column intersection is empty. Unsupported atoms make the branch satisfiable +/// (keep), as required by the fail-open contract. +bool branchIsUnsatisfiable( + const std::vector & branch_atoms, + const NamesAndTypesList & hybrid_columns, + const ContextPtr & context) +{ + std::unordered_map domains; + for (const auto & ast : branch_atoms) + { + auto atom = extractAtom(ast, /*negated*/ false, hybrid_columns, context); + if (!atom) + return false; /// Unsupported atom → branch is "unknown" → treat as satisfiable. + if (!applyAtomToDomains(domains, *atom)) + return true; + } + return false; +} + +} + +bool canPruneHybridSegment( + const ASTPtr & prewhere, + const ASTPtr & where, + const ASTPtr & segment_predicate, + const NamesAndTypesList & hybrid_columns, + const ContextPtr & context) +try +{ + /// Step 1: split top-level AND of (prewhere, where, segment_predicate) into conjuncts. + std::vector conjuncts; + collectConjuncts(prewhere, conjuncts); + collectConjuncts(where, conjuncts); + collectConjuncts(segment_predicate, conjuncts); + if (conjuncts.empty()) + return false; + + /// Step 2: classify each conjunct as a mandatory atom (singleton) or a multi-disjunct + /// OR group (alternative). Atoms inside an OR alternative are themselves AND-flattened + /// so each disjunct can be a small conjunction such as `date = today() AND id IN (...)`. + std::vector mandatory; + std::vector>> or_groups; /// group → branch → atoms + + for (const auto & c : conjuncts) + { + std::vector disjuncts; + collectDisjuncts(c, disjuncts); + if (disjuncts.size() == 1) + { + mandatory.push_back(disjuncts.front()); + continue; + } + std::vector> branches; + branches.reserve(disjuncts.size()); + for (const auto & d : disjuncts) + { + std::vector b; + collectConjuncts(d, b); + branches.push_back(std::move(b)); + } + or_groups.push_back(std::move(branches)); + } + + /// Step 3: enforce bounded DNF. + if (or_groups.size() > MAX_OR_GROUPS) + return false; + size_t total = 1; + for (const auto & g : or_groups) + { + if (g.empty()) + return false; + total *= g.size(); + if (total > MAX_TOTAL_BRANCHES) + return false; + } + + /// Step 4: if there are no OR groups, evaluate the single mandatory branch directly. + if (or_groups.empty()) + return branchIsUnsatisfiable(mandatory, hybrid_columns, context); + + /// Step 5: cartesian product over OR groups; each combination ANDed with `mandatory` + /// forms a DNF branch. Prune iff every branch is provably unsatisfiable. + std::vector idx(or_groups.size(), 0); + while (true) + { + std::vector branch = mandatory; + for (size_t i = 0; i < or_groups.size(); ++i) + { + const auto & disjunct = or_groups[i][idx[i]]; + branch.insert(branch.end(), disjunct.begin(), disjunct.end()); + } + + if (!branchIsUnsatisfiable(branch, hybrid_columns, context)) + return false; + + /// Increment cartesian-product indices. + size_t k = 0; + for (; k < or_groups.size(); ++k) + { + if (++idx[k] < or_groups[k].size()) + break; + idx[k] = 0; + } + if (k == or_groups.size()) + break; + } + + return true; +} +catch (...) +{ + /// Fail-open: any unexpected exception in atom extraction, type coercion, or + /// constant evaluation must not prevent the segment from being scanned normally. + return false; +} + +} diff --git a/src/Storages/HybridSegmentPruner.h b/src/Storages/HybridSegmentPruner.h new file mode 100644 index 000000000000..235dab6704f1 --- /dev/null +++ b/src/Storages/HybridSegmentPruner.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +/// Conservative satisfiability check for Hybrid segment pruning. +/// +/// Combines (PREWHERE AND WHERE AND segment_predicate), restricted to atoms over +/// columns of the Hybrid table, normalizes via top-level AND/OR walking, and tries +/// to prove the resulting condition unsatisfiable through bounded DNF expansion +/// plus per-column typed range/IN intersection. +/// +/// Returns true only when the conjunction is provably empty (the segment can be +/// pruned). Returns false in all other cases — including unsupported predicates, +/// constant-folding failures, type-coercion ambiguity, and exceptions — so the +/// caller can fall back to scanning the segment normally. +/// +/// Inputs: +/// - prewhere, where, segment_predicate: ASTs (any may be null). +/// The caller is responsible for removing JOIN-side predicates and for +/// substituting hybridParam(...) literals before invoking this function. +/// - hybrid_columns: column names and types from the Hybrid storage snapshot. +/// Atoms referencing columns not in this list are treated as unsupported and +/// keep the segment. +/// - context: used for constant-expression evaluation. +bool canPruneHybridSegment( + const ASTPtr & prewhere, + const ASTPtr & where, + const ASTPtr & segment_predicate, + const NamesAndTypesList & hybrid_columns, + const ContextPtr & context); + +} diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index ff1e0025aa9b..60937ccac6cf 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -48,6 +49,7 @@ #include #include #include +#include #include #include #include @@ -102,7 +104,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -536,6 +540,26 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( } } + /// Hybrid segment pruning: mirror the per-shard pruning above, but at the segment level. + /// When a segment's predicate is provably unsatisfiable with the user query, drop it from + /// the plan. The base segment is signalled to `read()` by emptying `optimized_cluster` — + /// the same idiom `optimize_skip_unused_shards` uses for empty shard sets — and `nodes` is + /// recomputed automatically from the empty cluster. The verdict is recomputed in `read()` + /// for per-segment skipping; both calls read the watermark snapshot frozen on + /// `storage_snapshot` (see `HybridSnapshotData`), so the two verdicts agree even under a + /// concurrent `ALTER MODIFY SETTING hybrid_watermark_*`. + HybridPruningVerdict pruning_verdict; + if (!segments.empty() || base_segment_predicate) + { + pruning_verdict = computeHybridPruningVerdict(query_info, storage_snapshot, local_context); + if (pruning_verdict.base_pruned) + { + query_info.optimized_cluster = cluster->getClusterWithMultipleShards({}); + cluster = query_info.optimized_cluster; + nodes = getClusterQueriedNodes(settings, cluster); + } + } + if (settings[Setting::distributed_group_by_no_merge]) { if (settings[Setting::distributed_group_by_no_merge] == DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION) @@ -560,7 +584,13 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( // TODO: check logic if (!segments.empty()) - nodes += segments.size(); + { + size_t surviving_segments = segments.size(); + for (bool is_pruned : pruning_verdict.segments_pruned) + if (is_pruned && surviving_segments > 0) + --surviving_segments; + nodes += surviving_segments; + } /// If there is only one node, the query can be fully processed by the /// shard, initiator will work as a proxy only. @@ -792,6 +822,17 @@ std::optional StorageDistributed::getOptimizedQueryP StorageSnapshotPtr StorageDistributed::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr) const { + /// For Hybrid tables, freeze the watermark snapshot at snapshot acquisition time so + /// every later phase (`getQueryProcessingStage()`, `read()`) operates on the same + /// values. A concurrent `ALTER MODIFY SETTING hybrid_watermark_*` cannot change what + /// this query sees, which keeps the pruning verdict — and therefore the chosen + /// processing stage — consistent with the planned segment set. + if (!segments.empty() || base_segment_predicate) + { + auto data = std::make_unique(); + data->watermark_snapshot = hybrid_watermark_params.get(); + return std::make_shared(*this, metadata_snapshot, std::move(data)); + } return std::make_shared(*this, metadata_snapshot); } @@ -1179,6 +1220,141 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, } +ASTPtr StorageDistributed::substituteHybridWatermarks( + ASTPtr predicate_ast, + const MultiVersion::Version & watermarks) +{ + if (!predicate_ast) + return predicate_ast; + predicate_ast = predicate_ast->clone(); + + std::function replace_hybrid_params = [&](ASTPtr & node) + { + if (auto * func = node->as(); func && func->name == "hybridParam") + { + auto * arg_list = func->arguments ? func->arguments->as() : nullptr; + if (!arg_list || arg_list->children.size() != 2) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam() requires exactly 2 arguments: (name, type)"); + + auto * name_lit = arg_list->children[0]->as(); + auto * type_lit = arg_list->children[1]->as(); + if (!name_lit || name_lit->value.getType() != Field::Types::String) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam() first argument (name) must be a string literal"); + if (!type_lit || type_lit->value.getType() != Field::Types::String) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "hybridParam() second argument (type) must be a string literal"); + + const auto & param_name = name_lit->value.safeGet(); + const auto & type_name = type_lit->value.safeGet(); + + if (!watermarks) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", + param_name, param_name); + + auto it = watermarks->find(param_name); + if (it == watermarks->end()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", + param_name, param_name); + + auto data_type = DataTypeFactory::instance().get(type_name); + auto col = data_type->createColumn(); + ReadBufferFromString buf(it->second); + data_type->getDefaultSerialization()->deserializeWholeText(*col, buf, FormatSettings{}); + node = make_intrusive((*col)[0]); + return; + } + + for (auto & child : node->children) + replace_hybrid_params(child); + }; + replace_hybrid_params(predicate_ast); + return predicate_ast; +} + +StorageDistributed::HybridPruningVerdict StorageDistributed::computeHybridPruningVerdict( + const SelectQueryInfo & query_info, + const StorageSnapshotPtr & storage_snapshot, + const ContextPtr & local_context) const +{ + StorageDistributed::HybridPruningVerdict verdict; + verdict.segments_pruned.assign(segments.size(), false); + + if (segments.empty() && !base_segment_predicate) + return verdict; + + /// Reuse the watermark snapshot frozen at `getStorageSnapshot()` time. Both + /// `getQueryProcessingStage()` and `read()` call this function with the same + /// `storage_snapshot`, so the verdict is identical across the two calls regardless + /// of any concurrent `ALTER MODIFY SETTING hybrid_watermark_*`. Falling back to a + /// fresh `MultiVersion::get()` only happens when `getStorageSnapshot()` did not + /// attach `HybridSnapshotData` (e.g. a code path that bypasses it); we keep the + /// fallback for defensiveness, but it is not exercised by the standard read path. + if (const auto * hybrid_data = storage_snapshot->data + ? dynamic_cast(storage_snapshot->data.get()) + : nullptr) + verdict.watermark_snapshot = hybrid_data->watermark_snapshot; + else + verdict.watermark_snapshot = hybrid_watermark_params.get(); + + /// Extract Hybrid-owned WHERE/PREWHERE from the user query. JOINs are conservatively + /// excluded — when a JOIN is present, leave both null so the pruner can never claim + /// unsatisfiability based on a joined table's predicate. + NamesAndTypesList hybrid_columns = storage_snapshot->metadata->getColumns().getAll(); + ASTPtr prunable_where; + ASTPtr prunable_prewhere; + ASTSelectQuery * select_for_pruning = nullptr; + if (query_info.query) + { + if (auto * select = query_info.query->as()) + select_for_pruning = select; + else if (auto * union_query = query_info.query->as(); + union_query && union_query->list_of_selects && !union_query->list_of_selects->children.empty()) + select_for_pruning = union_query->list_of_selects->children.front()->as(); + } + if (select_for_pruning) + { + bool has_join = false; + if (auto tables = select_for_pruning->tables()) + { + for (const auto & child : tables->children) + { + if (auto * elem = child->as(); elem && elem->table_join) + { + has_join = true; + break; + } + } + } + if (!has_join) + { + prunable_where = select_for_pruning->where(); + prunable_prewhere = select_for_pruning->prewhere(); + } + } + + auto check = [&](const ASTPtr & predicate_ast) -> bool + { + if (!predicate_ast) + return false; + ASTPtr substituted = substituteHybridWatermarks(predicate_ast, verdict.watermark_snapshot); + return canPruneHybridSegment( + prunable_prewhere, prunable_where, substituted, + hybrid_columns, local_context); + }; + + if (base_segment_predicate) + verdict.base_pruned = check(base_segment_predicate); + + for (size_t i = 0; i < segments.size(); ++i) + verdict.segments_pruned[i] = check(segments[i].predicate_ast); + + return verdict; +} + void StorageDistributed::read( QueryPlan & query_plan, const Names &, @@ -1229,61 +1405,30 @@ void StorageDistributed::read( LOG_TRACE(log, "rewriteSelectQuery (target: {}) -> {}", target, ast->formatForLogging()); }; - auto watermark_snapshot = hybrid_watermark_params.get(); - - auto substitute_watermarks = [&](ASTPtr predicate_ast) -> ASTPtr - { - if (!predicate_ast) - return predicate_ast; - predicate_ast = predicate_ast->clone(); - - std::function replace_hybrid_params = [&](ASTPtr & node) - { - if (auto * func = node->as(); func && func->name == "hybridParam") - { - auto * arg_list = func->arguments ? func->arguments->as() : nullptr; - if (!arg_list || arg_list->children.size() != 2) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "hybridParam() requires exactly 2 arguments: (name, type)"); - - auto * name_lit = arg_list->children[0]->as(); - auto * type_lit = arg_list->children[1]->as(); - if (!name_lit || name_lit->value.getType() != Field::Types::String) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "hybridParam() first argument (name) must be a string literal"); - if (!type_lit || type_lit->value.getType() != Field::Types::String) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "hybridParam() second argument (type) must be a string literal"); - - const auto & param_name = name_lit->value.safeGet(); - const auto & type_name = type_lit->value.safeGet(); - - if (!watermark_snapshot) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", - param_name, param_name); + /// Recompute the Hybrid pruning verdict for per-segment skipping. The watermark snapshot + /// it depends on was frozen at `getStorageSnapshot()` time and is reused via + /// `HybridSnapshotData`, so this verdict matches the one `getQueryProcessingStage()` + /// produced — both the surviving-segment set and the substitution of `hybridParam(...)` + /// literals stay consistent with the chosen processing stage even under a concurrent + /// `ALTER MODIFY SETTING hybrid_watermark_*`. + HybridPruningVerdict pruning_verdict; + if (!segments.empty() || base_segment_predicate) + pruning_verdict = computeHybridPruningVerdict(query_info, storage_snapshot, local_context); - auto it = watermark_snapshot->find(param_name); - if (it == watermark_snapshot->end()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Hybrid watermark '{}' has no value; use ALTER TABLE ... MODIFY SETTING {} = '...' to set it", - param_name, param_name); - - auto data_type = DataTypeFactory::instance().get(type_name); - auto col = data_type->createColumn(); - ReadBufferFromString buf(it->second); - data_type->getDefaultSerialization()->deserializeWholeText(*col, buf, FormatSettings{}); - node = make_intrusive((*col)[0]); - return; - } + auto watermark_snapshot = pruning_verdict.watermark_snapshot + ? pruning_verdict.watermark_snapshot : hybrid_watermark_params.get(); - for (auto & child : node->children) - replace_hybrid_params(child); - }; - replace_hybrid_params(predicate_ast); - return predicate_ast; + auto try_prune_additional = [&](size_t segment_idx, const String & target) -> bool + { + if (segment_idx >= pruning_verdict.segments_pruned.size() || !pruning_verdict.segments_pruned[segment_idx]) + return false; + LOG_TRACE(log, "Hybrid segment pruned (target: {})", target); + return true; }; + if (pruning_verdict.base_pruned) + LOG_TRACE(log, "Hybrid segment pruned (target: {})", base_target); + if (settings[Setting::allow_experimental_analyzer]) { StorageID remote_storage_id = StorageID::createEmpty(); @@ -1294,7 +1439,7 @@ void StorageDistributed::read( query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, remote_storage_id, remote_table_function_ptr, - substitute_watermarks(base_segment_predicate)); + substituteHybridWatermarks(base_segment_predicate, watermark_snapshot)); Block block = *InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree_distributed, local_context, SelectQueryOptions(processed_stage).analyze()); /** For distributed tables we do not need constants in header, since we don't send them to remote servers. * Moreover, constants can break some functions like `hostName` that are constants only for local queries. @@ -1311,8 +1456,14 @@ void StorageDistributed::read( if (!segments.empty()) { - for (const auto & segment : segments) + for (size_t segment_idx = 0; segment_idx < segments.size(); ++segment_idx) { + const auto & segment = segments[segment_idx]; + if (try_prune_additional(segment_idx, describe_segment_target(segment))) + continue; + + ASTPtr substituted_predicate = substituteHybridWatermarks(segment.predicate_ast, watermark_snapshot); + // Create a modified query info with the segment predicate SelectQueryInfo additional_query_info = query_info; @@ -1320,7 +1471,7 @@ void StorageDistributed::read( query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, segment.storage_id ? *segment.storage_id : StorageID::createEmpty(), segment.storage_id ? nullptr : segment.table_function_ast, - substitute_watermarks(segment.predicate_ast)); + substituted_predicate); additional_query_info.query = queryNodeToDistributedSelectQuery(additional_query_tree); additional_query_info.query_tree = std::move(additional_query_tree); @@ -1330,8 +1481,11 @@ void StorageDistributed::read( } } - // For empty shards - avoid early return if we have additional segments - if (modified_query_info.getCluster()->getShardsInfo().empty() && segments.empty()) + /// Empty cluster + nothing else to plan: take the same path Distributed already uses + /// when `optimize_skip_unused_shards` filters every shard. For Hybrid this is the + /// "all segments pruned" case (base pruned via empty `optimized_cluster`, every + /// additional pruned via the segments loop above). + if (modified_query_info.getCluster()->getShardsInfo().empty() && additional_query_infos.empty()) return; } else @@ -1341,16 +1495,20 @@ void StorageDistributed::read( modified_query_info.query = ClusterProxy::rewriteSelectQuery( local_context, modified_query_info.query, remote_database, remote_table, remote_table_function_ptr, - substitute_watermarks(base_segment_predicate)); + substituteHybridWatermarks(base_segment_predicate, watermark_snapshot)); log_rewritten_query(base_target, modified_query_info.query); if (!segments.empty()) { - for (const auto & segment : segments) + for (size_t segment_idx = 0; segment_idx < segments.size(); ++segment_idx) { + const auto & segment = segments[segment_idx]; + if (try_prune_additional(segment_idx, describe_segment_target(segment))) + continue; + + ASTPtr resolved_predicate = substituteHybridWatermarks(segment.predicate_ast, watermark_snapshot); SelectQueryInfo additional_query_info = query_info; - ASTPtr resolved_predicate = substitute_watermarks(segment.predicate_ast); if (segment.storage_id) { additional_query_info.query = ClusterProxy::rewriteSelectQuery( @@ -1372,8 +1530,11 @@ void StorageDistributed::read( } } - // For empty shards - avoid early return if we have additional segments - if (modified_query_info.getCluster()->getShardsInfo().empty() && segments.empty()) + /// Empty cluster + nothing else to plan: take the same path Distributed already uses + /// when `optimize_skip_unused_shards` filters every shard. For Hybrid this is the + /// "all segments pruned" case (base pruned via empty `optimized_cluster`, every + /// additional pruned via the segments loop above). + if (modified_query_info.getCluster()->getShardsInfo().empty() && additional_query_infos.empty()) { Pipe pipe(std::make_shared(header)); auto read_from_pipe = std::make_unique(std::move(pipe)); @@ -1384,6 +1545,45 @@ void StorageDistributed::read( } } + /// Hybrid case 2: base pruned (cluster empty via `getQueryProcessingStage`'s empty + /// `optimized_cluster`) and at least one additional segment survives. The all-pruned + /// subcase is already handled by the existing empty-cluster early-returns above. We + /// can't call `ClusterProxy::executeQuery` with an empty cluster (its + /// `updateSettingsAndClientInfoForCluster` dereferences `getShardsAddresses().front()` + /// when `is_remote_function=true`), so build the local plans directly. The block below + /// is the same shape as the `additional_query_infos` block in `ClusterProxy::executeQuery` + /// — that block uses the original context (not `new_context`), so we don't depend on the + /// shared distributed-context setup. + if (modified_query_info.getCluster()->getShardsInfo().empty() && !additional_query_infos.empty()) + { + const Block & header_block = *header; + std::vector plans; + plans.reserve(additional_query_infos.size()); + for (const auto & additional_query_info : additional_query_infos) + { + plans.emplace_back(createLocalPlan( + additional_query_info.query, header_block, local_context, + processed_stage, /*shard_num=*/0, /*shard_count=*/1, /*has_missing_objects=*/false, "")); + } + + if (plans.size() == 1) + { + query_plan = std::move(*plans.front()); + } + else + { + SharedHeaders input_headers; + input_headers.reserve(plans.size()); + for (auto & plan : plans) + input_headers.emplace_back(plan->getCurrentHeader()); + + auto union_step = std::make_unique(std::move(input_headers)); + union_step->setStepDescription("Hybrid"); + query_plan.unitePlans(std::move(union_step), std::move(plans)); + } + return; + } + if (!modified_query_info.getCluster()->getShardsInfo().empty() || !additional_query_infos.empty()) { ClusterProxy::SelectStreamFactory select_stream_factory = diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 138a49d9c992..1dd6690650fb 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -220,6 +220,41 @@ class StorageDistributed final : public IStorage, WithContext std::vector getDirectoryQueueStatuses() const; static IColumn::Selector createSelector(ClusterPtr cluster, const ColumnWithTypeAndName & result); + + /// Substitute hybridParam(name, type) calls in `predicate_ast` with literal values from + /// `watermarks`. Returns a fresh cloned AST. Pass-through for nullptr. + static ASTPtr substituteHybridWatermarks( + ASTPtr predicate_ast, + const MultiVersion::Version & watermarks); + + /// Hybrid-specific snapshot-time state attached to `StorageSnapshot::data`. Populated + /// once in `StorageDistributed::getStorageSnapshot()` so the watermark values seen by + /// `getQueryProcessingStage()` and `read()` cannot diverge mid-query under a concurrent + /// `ALTER MODIFY SETTING hybrid_watermark_*`. Without this, two independent + /// `MultiVersion::get()` calls could observe different versions and produce inconsistent + /// pruning verdicts (e.g. a `Complete`-stage plan unioned without final merge). + struct HybridSnapshotData : public StorageSnapshot::Data + { + MultiVersion::Version watermark_snapshot; + }; + + /// Per-query Hybrid pruning verdict. Recomputed in both `getQueryProcessingStage()` + /// (to drive the stage decision and empty `optimized_cluster` when the base is pruned) + /// and `read()` (to skip planning of pruned additional segments). The verdict is + /// deterministic across both calls because the watermark snapshot it depends on is + /// taken once at `getStorageSnapshot()` time and reused via `HybridSnapshotData`. + struct HybridPruningVerdict + { + bool base_pruned = false; + std::vector segments_pruned; + MultiVersion::Version watermark_snapshot; + }; + + HybridPruningVerdict computeHybridPruningVerdict( + const SelectQueryInfo & query_info, + const StorageSnapshotPtr & storage_snapshot, + const ContextPtr & local_context) const; + /// Apply the following settings: /// - optimize_skip_unused_shards /// - force_optimize_skip_unused_shards diff --git a/src/Storages/tests/gtest_hybrid_segment_pruner.cpp b/src/Storages/tests/gtest_hybrid_segment_pruner.cpp new file mode 100644 index 000000000000..58d55257a367 --- /dev/null +++ b/src/Storages/tests/gtest_hybrid_segment_pruner.cpp @@ -0,0 +1,91 @@ +#include + +#include + +#include +#include +#include +#include +#include + +#include +#include + +using namespace DB; + +namespace +{ + +ASTPtr parseExpression(const std::string & text) +{ + ParserExpression parser; + return parseQuery(parser, text, 4096, 1000, 1000000); +} + +NamesAndTypesList hybridColumnsForTests() +{ + return { + {"ts", std::make_shared()}, + {"date", std::make_shared()}, + {"customerid", std::make_shared()}, + {"x", std::make_shared()}, + {"y", std::make_shared()}, + }; +} + +class HybridSegmentPrunerTest : public ::testing::Test +{ +public: + static void SetUpTestSuite() + { + tryRegisterFunctions(); + } +}; + +} + +TEST_F(HybridSegmentPrunerTest, RangeContradictionPrunes) +{ + /// `ts > '2025-10-01' AND ts <= '2025-09-01'` → unsat → can prune. + auto where = parseExpression("ts > '2025-10-01'"); + auto seg = parseExpression("ts <= '2025-09-01'"); + EXPECT_TRUE(canPruneHybridSegment( + /*prewhere*/ nullptr, where, seg, hybridColumnsForTests(), getContext().context)); +} + +TEST_F(HybridSegmentPrunerTest, OverlappingRangeKeeps) +{ + /// `ts > '2025-10-01' AND ts > '2025-08-01'` → satisfiable → keep. + auto where = parseExpression("ts > '2025-10-01'"); + auto seg = parseExpression("ts > '2025-08-01'"); + EXPECT_FALSE(canPruneHybridSegment( + /*prewhere*/ nullptr, where, seg, hybridColumnsForTests(), getContext().context)); +} + +TEST_F(HybridSegmentPrunerTest, BoundedDnfWithConstantFolding) +{ + /// `(date = yesterday() AND customerid IN (2, 3)) OR (date = today() AND customerid IN (2, 3))` + /// combined with `date < '2015-01-01'` is unsat in every DNF branch. + auto where = parseExpression( + "(date = yesterday() AND customerid IN (2, 3)) OR (date = today() AND customerid IN (2, 3))"); + auto seg = parseExpression("date < '2015-01-01'"); + EXPECT_TRUE(canPruneHybridSegment( + /*prewhere*/ nullptr, where, seg, hybridColumnsForTests(), getContext().context)); +} + +TEST_F(HybridSegmentPrunerTest, OrAlternativeNotMandatoryConstraint) +{ + /// `(x < 0 OR y = 1) AND x > 5`: the `x < 0` branch is unsat, + /// but the `y = 1` branch is satisfiable → keep. + auto where = parseExpression("(x < 0 OR y = 1) AND x > 5"); + EXPECT_FALSE(canPruneHybridSegment( + /*prewhere*/ nullptr, where, /*segment*/ nullptr, hybridColumnsForTests(), getContext().context)); +} + +TEST_F(HybridSegmentPrunerTest, UnsupportedAtomInOrKeeps) +{ + /// An OR with an unsupported atom (e.g. `length(...)`) cannot be pruned. + auto where = parseExpression("(length(toString(x)) > 10 OR x = 1) AND x = 2"); + EXPECT_FALSE(canPruneHybridSegment( + /*prewhere*/ nullptr, where, /*segment*/ nullptr, hybridColumnsForTests(), getContext().context)); +} diff --git a/tests/queries/0_stateless/03646_hybrid_segment_pruning.reference b/tests/queries/0_stateless/03646_hybrid_segment_pruning.reference new file mode 100644 index 000000000000..b9a8b083e394 --- /dev/null +++ b/tests/queries/0_stateless/03646_hybrid_segment_pruning.reference @@ -0,0 +1,84 @@ +-- {echoOn} + +-- Test 1: Baseline (no pruning) — both segments planned, Union (Hybrid) present. +SELECT count() FROM pruning_t; +4 +SELECT count() FROM pruning_t WHERE value > 0; +4 +EXPLAIN SELECT count() FROM pruning_t WHERE value > 0; +Expression ((Project names + Projection)) + MergingAggregated + Union (Hybrid) + Aggregating + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.local_hot) + MergingAggregated + Aggregating + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.local_cold) +-- Test 2: Cold (additional) segment pruned via range contradiction — only base remains. +SELECT count() FROM pruning_t WHERE ts > '2025-10-01'; +2 +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 3: Hot (base) segment pruned — only cold remains as a local plan. +SELECT count() FROM pruning_t WHERE ts <= '2025-08-01'; +2 +EXPLAIN SELECT count() FROM pruning_t WHERE ts <= '2025-08-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 4: PREWHERE participates in pruning. +SELECT count() FROM pruning_t PREWHERE ts > '2025-10-01'; +2 +EXPLAIN SELECT count() FROM pruning_t PREWHERE ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 5: All segments pruned — getQueryProcessingStage returns FetchColumns, +-- planner inserts ReadNothing, AggregatingTransform synthesizes the default row. +SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; +0 +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; +Expression ((Project names + Projection)) + Aggregating + Expression (Before GROUP BY) + Filter ((WHERE + Change column names to column identifiers)) + ReadNothing (Read from NullSource) +-- {echoOn} + +-- Test 6: three segments, only hot survives. +SELECT count() FROM pruning_t3 WHERE ts > '2025-10-01'; +2 +EXPLAIN SELECT count() FROM pruning_t3 WHERE ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + ReadFromPreparedSource (_exact_count_projection) +-- Test 7: OR alternative is not a mandatory constraint — hot survives via the OR. +SELECT count() FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01'; +2 +EXPLAIN SELECT count() FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01'; +Expression ((Project names + Projection)) + Aggregating + Expression (Before GROUP BY) + Expression ((WHERE + Change column names to column identifiers)) + ReadFromMergeTree (default.local_hot) +-- Test 8: JOIN — pruner conservatively skips, both segments planned. Only the count is +-- asserted here because EXPLAIN's JOIN-side ordering depends on randomized settings the +-- test harness cycles through (e.g. query_plan_join_swap_table). +SELECT count() +FROM pruning_t AS t +INNER JOIN dim AS d ON t.value = d.id +WHERE d.id > 1 AND t.ts <= '2025-08-01'; +2 +-- Test 9: SELECT alias shadows a Hybrid column used by segment predicates. With default +-- prefer_column_name_to_alias=0 the WHERE's `ts` resolves to the alias expression (a +-- constant true for every row); if the pruner mistakenly treated the unresolved `ts` as +-- the Hybrid column it would prune the cold segment (`ts <= '2025-09-01'`) and silently +-- drop those rows. All 4 rows must survive. +SELECT count() FROM (SELECT toDateTime('2025-11-01') AS ts, value FROM pruning_t WHERE ts > '2025-10-01'); +4 diff --git a/tests/queries/0_stateless/03646_hybrid_segment_pruning.sql b/tests/queries/0_stateless/03646_hybrid_segment_pruning.sql new file mode 100644 index 000000000000..27cdee91bbab --- /dev/null +++ b/tests/queries/0_stateless/03646_hybrid_segment_pruning.sql @@ -0,0 +1,127 @@ +-- Tags: no-fasttest +-- Tag no-fasttest: requires remote() table function + +SET allow_experimental_hybrid_table = 1; + +-- The EXPLAIN-based assertions below print plan shapes that some randomized session +-- settings perturb. Pin them for deterministic output. None of these affect pruning logic; +-- they just stabilize how the plan is rendered. +SET prefer_localhost_replica = 1; -- avoid ReadFromRemote vs ReadFromMergeTree flips +SET query_plan_join_swap_table = 'false'; -- pin JOIN side ordering +SET use_query_condition_cache = 0; -- consistent EXPLAIN across runs +SET optimize_trivial_count_query = 1; +SET parallel_replicas_local_plan = 0; + +DROP TABLE IF EXISTS local_hot SYNC; +DROP TABLE IF EXISTS local_cold SYNC; +DROP TABLE IF EXISTS local_warm SYNC; +DROP TABLE IF EXISTS pruning_t SYNC; +DROP TABLE IF EXISTS pruning_t3 SYNC; +DROP TABLE IF EXISTS pruning_or SYNC; +DROP TABLE IF EXISTS dim SYNC; + +CREATE TABLE local_hot (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; +CREATE TABLE local_cold (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; +INSERT INTO local_hot VALUES ('2025-10-15', 1), ('2025-11-01', 2); +INSERT INTO local_cold VALUES ('2025-08-01', 3), ('2025-06-15', 4); + +CREATE TABLE pruning_t +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-09-01' +AS local_hot; + +-- {echoOn} + +-- Test 1: Baseline (no pruning) — both segments planned, Union (Hybrid) present. +SELECT count() FROM pruning_t; +SELECT count() FROM pruning_t WHERE value > 0; +EXPLAIN SELECT count() FROM pruning_t WHERE value > 0; + +-- Test 2: Cold (additional) segment pruned via range contradiction — only base remains. +SELECT count() FROM pruning_t WHERE ts > '2025-10-01'; +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-10-01'; + +-- Test 3: Hot (base) segment pruned — only cold remains as a local plan. +SELECT count() FROM pruning_t WHERE ts <= '2025-08-01'; +EXPLAIN SELECT count() FROM pruning_t WHERE ts <= '2025-08-01'; + +-- Test 4: PREWHERE participates in pruning. +SELECT count() FROM pruning_t PREWHERE ts > '2025-10-01'; +EXPLAIN SELECT count() FROM pruning_t PREWHERE ts > '2025-10-01'; + +-- Test 5: All segments pruned — getQueryProcessingStage returns FetchColumns, +-- planner inserts ReadNothing, AggregatingTransform synthesizes the default row. +SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; +EXPLAIN SELECT count() FROM pruning_t WHERE ts > '2025-12-01' AND ts <= '2025-08-01'; + +-- {echoOff} + +-- Test 6: three-segment table; cold + middle pruned, only hot kept. +CREATE TABLE local_warm (ts DateTime, value UInt64) ENGINE = MergeTree ORDER BY ts; +INSERT INTO local_warm VALUES ('2025-09-15', 5), ('2025-09-25', 6); + +CREATE TABLE pruning_t3 +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_warm'), + ts > hybridParam('hybrid_watermark_cold', 'DateTime') + AND ts <= hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_cold', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-10-01', hybrid_watermark_cold = '2025-09-01' +AS local_hot; + +CREATE TABLE pruning_or +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'local_hot'), + ts > hybridParam('hybrid_watermark_hot', 'DateTime'), + remote('127.0.0.1:9000', currentDatabase(), 'local_cold'), + ts <= hybridParam('hybrid_watermark_hot', 'DateTime') +) +SETTINGS hybrid_watermark_hot = '2025-09-01' +AS local_hot; + +CREATE TABLE dim (id UInt64, label String) ENGINE = MergeTree ORDER BY id; +INSERT INTO dim VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'); + +-- {echoOn} + +-- Test 6: three segments, only hot survives. +SELECT count() FROM pruning_t3 WHERE ts > '2025-10-01'; +EXPLAIN SELECT count() FROM pruning_t3 WHERE ts > '2025-10-01'; + +-- Test 7: OR alternative is not a mandatory constraint — hot survives via the OR. +SELECT count() FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01'; +EXPLAIN SELECT count() FROM pruning_or WHERE (value = 1 OR value = 2) AND ts > '2025-10-01'; + +-- Test 8: JOIN — pruner conservatively skips, both segments planned. Only the count is +-- asserted here because EXPLAIN's JOIN-side ordering depends on randomized settings the +-- test harness cycles through (e.g. query_plan_join_swap_table). +SELECT count() +FROM pruning_t AS t +INNER JOIN dim AS d ON t.value = d.id +WHERE d.id > 1 AND t.ts <= '2025-08-01'; + +-- Test 9: SELECT alias shadows a Hybrid column used by segment predicates. With default +-- prefer_column_name_to_alias=0 the WHERE's `ts` resolves to the alias expression (a +-- constant true for every row); if the pruner mistakenly treated the unresolved `ts` as +-- the Hybrid column it would prune the cold segment (`ts <= '2025-09-01'`) and silently +-- drop those rows. All 4 rows must survive. +SELECT count() FROM (SELECT toDateTime('2025-11-01') AS ts, value FROM pruning_t WHERE ts > '2025-10-01'); + +-- {echoOff} + +DROP TABLE IF EXISTS dim SYNC; +DROP TABLE IF EXISTS pruning_or SYNC; +DROP TABLE IF EXISTS pruning_t3 SYNC; +DROP TABLE IF EXISTS pruning_t SYNC; +DROP TABLE IF EXISTS local_hot SYNC; +DROP TABLE IF EXISTS local_cold SYNC; +DROP TABLE IF EXISTS local_warm SYNC;