diff --git a/docs/en/operations/system-tables/named_scalars.md b/docs/en/operations/system-tables/named_scalars.md new file mode 100644 index 000000000000..4d631e23dc27 --- /dev/null +++ b/docs/en/operations/system-tables/named_scalars.md @@ -0,0 +1,132 @@ +--- +description: 'System table with one row per named cached scalar defined via CREATE NAMED SCALAR.' +keywords: ['system table', 'named_scalars'] +slug: /operations/system-tables/named_scalars +title: 'system.named_scalars' +doc_type: 'reference' +--- + +One row per named cached scalar defined via +[`CREATE NAMED SCALAR`](/sql-reference/statements/create/named-scalar). +Rows are visible to users with either the `getNamedScalar` function-execute +grant or the `SHOW_NAMED_SCALARS` grant. The `getNamedScalar` grant exposes +value-tier columns such as the name, current value, type, and freshness state. +The `SHOW_NAMED_SCALARS` grant additionally exposes operator-tier columns such +as the definition, definer, last error, and in-flight refresh state. + +## Columns + +Columns marked **value-tier** are populated for users with the `getNamedScalar` +function-execute grant. Columns marked **operator-tier** additionally require +`SHOW_NAMED_SCALARS` and are `NULL` when only the value-tier grant is present. + +| Column | Type | Tier | Description | +|---|---|---|---| +| `name` | String | value | Scalar name. | +| `kind` | Enum8(`'local'`=0, `'shared'`=1) | value | Cache kind: `'local'` (per-server) or `'shared'` (Keeper-backed). | +| `value` | Nullable(String) | value | Current value as string; NULL if no value has been produced yet. | +| `loading_start_time` | DateTime | value | Time the definition was loaded into memory. | +| `last_refresh_time` | Nullable(DateTime) | value | Time of the last refresh attempt (NULL if never attempted). | +| `next_refresh_time` | Nullable(DateTime) | value | Next scheduled refresh time; NULL for static scalars. | +| `last_success_time` | Nullable(DateTime) | value | Time of the last successful refresh. | +| `refresh_interval` | Nullable(UInt64) | value | Refresh period in seconds; NULL if no `REFRESH` clause. | +| `type` | Nullable(String) | value | ClickHouse type of the current value. | +| `has_value` | UInt8 | value | 1 if a last-good value exists; 0 otherwise. | +| `current_value_is_valid` | UInt8 | value | 1 if the most recent refresh succeeded; 0 if it threw. | +| `last_refresh_hostname` | Nullable(String) | operator | Host that performed the last refresh (rotates across replicas for `shared`). | +| `definer` | Nullable(String) | operator | User whose privileges are used to evaluate the scalar definition. | +| `expression` | Nullable(String) | operator | The scalar's source query. | +| `exception` | Nullable(String) | operator | Last refresh error, prefixed with `[ERROR_CODE_NAME]:`. | +| `refresh_in_flight` | Nullable(UInt8) | operator | 1 while a refresh body is currently executing; 0 otherwise. | +| `refresh_started_at` | Nullable(DateTime) | operator | Wall-clock start of the in-flight refresh; NULL when idle. | +| `consecutive_failures` | Nullable(UInt64) | operator | Failed refreshes since the last success; reset to 0 on success. | + +## Example + +```sql +SELECT name, kind, value, type, current_value_is_valid, consecutive_failures, exception +FROM system.named_scalars; +``` + +## Operational signals + +The following query returns named scalars that need operator attention: + +```sql +SELECT + kind, + name, + has_value, + current_value_is_valid, + refresh_in_flight, + refresh_started_at, + refresh_interval, + consecutive_failures, + exception, + expression +FROM system.named_scalars +WHERE has_value = 0 + OR ifNull(consecutive_failures, 0) > 10 + OR ( + ifNull(refresh_in_flight, 0) = 1 + AND refresh_interval IS NOT NULL + AND refresh_started_at < now() - toIntervalSecond(refresh_interval) + ) +ORDER BY + has_value ASC, + consecutive_failures DESC, + refresh_started_at ASC, + kind, + name; +``` + +The query reports: + +- scalars with no populated value (`has_value = 0`); queries using them will + throw `NAMED_SCALAR_HAS_NO_VALUE`; +- scalars with more than 10 consecutive refresh failures; +- scalars whose refresh body is still running after its refresh interval. + +`refresh_interval` is `NULL` for scalars without a `REFRESH` clause. Operator +columns such as `exception`, `expression`, `refresh_in_flight`, and +`consecutive_failures` are visible only with the `SHOW_NAMED_SCALARS` grant. + +:::note Reading scalar values is all-or-nothing +The `getNamedScalar` grant — required for reading scalar values via the +`getNamedScalar` UDF and for the value-tier columns of this table — is +**not per-scalar**. Any role that holds it can read every named scalar's +value. Do not store credentials, secrets, or otherwise narrowly-scoped +sensitive data in named scalars unless every grantee should be able to +read every scalar. +::: + +Cumulative refresh counters live in `system.events`: + +| Event | Meaning | +|---|---| +| `NamedScalarRefreshAttempts` | Refresh ticks that ran the SELECT. | +| `NamedScalarRefreshSuccesses` | Eval+publish OK. | +| `NamedScalarRefreshFailures` | Eval threw or persist failed. | +| `NamedScalarRefreshSkippedByPeer` | Shared scalar — another replica ran the refresh this tick. | +| `NamedScalarRefreshDurationMicroseconds` | Cumulative wall time of refresh bodies. | + +`system.metrics` exposes the dedicated refresh pool's gauges: +`BackgroundNamedScalarRefreshPoolTask` (active refresh tasks) and +`BackgroundNamedScalarRefreshPoolSize` (worker count limit; 0 until +the first `CREATE NAMED SCALAR` triggers lazy pool init). + +## Refresh visibility and cancellation + +Refresh bodies execute through the standard `executeQuery` path, which means: + +- **`system.processes`** lists in-flight refresh SELECTs while they run. + They can be interrupted with `KILL QUERY` (the scalar then records + `exception LIKE '%QUERY_WAS_CANCELLED%'` and the previous good value + remains served). +- **`system.query_log`** contains a `QueryFinish` (or `ExceptionWhileProcessing`) + row per refresh body, with `is_internal = 1`. Filter on + `is_internal = 1` if you want to see only refresh queries; exclude it + if you want to keep refresh queries out of operator dashboards. +- **DROP NAMED SCALAR**, **CREATE OR REPLACE NAMED SCALAR**, and + server shutdown all interrupt the in-flight refresh body so they don't + block on a slow SELECT — independent of `KILL QUERY`. diff --git a/docs/en/sql-reference/functions/named-scalar-functions.md b/docs/en/sql-reference/functions/named-scalar-functions.md new file mode 100644 index 000000000000..73b3a5034429 --- /dev/null +++ b/docs/en/sql-reference/functions/named-scalar-functions.md @@ -0,0 +1,120 @@ +--- +description: 'Functions for reading named cached scalar values' +sidebar_label: 'Named Scalar' +slug: /sql-reference/functions/named-scalar-functions +title: 'Named Scalar Functions' +doc_type: 'reference' +--- + +# Named Scalar Functions + +Functions for reading values of named cached scalars defined with +[`CREATE NAMED SCALAR`](/sql-reference/statements/create/named-scalar). + +Both functions require the `getNamedScalar` function-execute grant. The +underlying cache kind (`local` or `shared`) is transparent to the caller — +the server dispatches to the correct backend automatically. + +## getNamedScalar + +Returns the current cached value of the named scalar. + +**Syntax** + +```sql +getNamedScalar(name) +``` + +**Arguments** + +- `name` — scalar name, must be a constant string expression. + +**Returned value** + +The cached value with the type recorded at the time of the last successful +refresh. If the scalar was created without a `REFRESH` clause (static scalar), +the type is fixed at creation time. + +**Errors** + +| Error code | Condition | +|---|---| +| `NAMED_SCALAR_NOT_FOUND` | No scalar with this name is loaded on the server. | +| `NAMED_SCALAR_HAS_NO_VALUE` | The scalar exists but has not yet produced a value (background populate is still running, or every attempt so far failed). | + +**Example** + +```sql +-- Create and populate +CREATE NAMED SCALAR fx_rate + REFRESH EVERY 1 HOUR + AS SELECT rate FROM rates WHERE pair = 'EUR/USD'; + +-- Read +SELECT amount * getNamedScalar('fx_rate') AS converted FROM orders; +``` + +**See also** + +- [`getNamedScalarOrDefault`](#getnamedscalararordefault) — returns a default value instead of throwing. +- [`system.named_scalars`](/operations/system-tables/named_scalars) — runtime introspection. + +## getNamedScalarOrDefault {#getnamedscalararordefault} + +Returns the current cached value of the named scalar, or a caller-supplied +default when the scalar is not found or has no value yet. + +**Syntax** + +```sql +getNamedScalarOrDefault(name, default) +``` + +**Arguments** + +- `name` — scalar name, must be a constant string expression. +- `default` — value to return when `getNamedScalar` would throw + `NAMED_SCALAR_NOT_FOUND` or `NAMED_SCALAR_HAS_NO_VALUE`. + May be any expression that is evaluable at query compile time. + +**Returned value** + +The cached scalar value, or `default` if the scalar is absent or has no value. +The return type is the type of the cached value; when the default is used, it +is cast to that type if possible. + +:::warning Return type follows the scalar's stored value +The return type of `getNamedScalarOrDefault` is determined at query compile +time from whichever of (cached scalar value, default) is available right +then. Two executions of the same SQL can therefore see different return +types: when the scalar is unpopulated the type comes from `default`; once +the scalar populates, the type comes from the stored value. + +For type-stable contexts — views, prepared statements, replicated DDL +bodies, expressions where the result is composed with another column — +make sure the default expression's type matches the scalar's stored +type, e.g. + +```sql +-- prefer this: +SELECT getNamedScalarOrDefault('p99_threshold', toUInt64(1000)) -- explicit Int64-shaped default +-- over this: +SELECT getNamedScalarOrDefault('p99_threshold', 1000) -- inferred Int32; mismatches if scalar is UInt64 +``` +::: + +**Example** + +```sql +-- Returns 0 when 'flap' is not defined or not yet populated. +SELECT getNamedScalarOrDefault('flap', 0); + +-- Fallback threshold while the scalar is initializing. +SELECT count() FROM events +WHERE latency_ms > getNamedScalarOrDefault('p99_threshold', 1000); +``` + +**See also** + +- [`getNamedScalar`](#getnamedscalar) — throws on missing or empty scalar. +- [`system.named_scalars`](/operations/system-tables/named_scalars) — runtime introspection. diff --git a/docs/en/sql-reference/statements/create/named-scalar.md b/docs/en/sql-reference/statements/create/named-scalar.md new file mode 100644 index 000000000000..0863c32c1372 --- /dev/null +++ b/docs/en/sql-reference/statements/create/named-scalar.md @@ -0,0 +1,305 @@ +--- +description: 'Documentation for CREATE NAMED SCALAR (named cached scalar query result)' +sidebar_label: 'NAMED SCALAR' +sidebar_position: 42 +slug: /sql-reference/statements/create/named-scalar +title: 'CREATE NAMED SCALAR' +doc_type: 'reference' +--- + +Creates a *named cached scalar query result* — a `SELECT` whose single-row, +single-column result is evaluated, cached, and served to subsequent queries as +cheaply as a literal constant. Useful for small, frequently-read values that +are expensive to recompute (aggregates over large tables, values fetched from +remote sources, counters refreshed on a schedule). + +Reads go through `getNamedScalar` / `getNamedScalarOrDefault`. The manager +dispatches to the local or shared (Keeper-backed) backend automatically based +on the scalar's recorded cache kind; no separate function is needed for shared +scalars. + +## Syntax + +```sql +CREATE [OR REPLACE] [LOCAL|SHARED] NAMED SCALAR [IF NOT EXISTS] + [ON CLUSTER ] + [DEFINER = { user | CURRENT_USER }] [SQL SECURITY DEFINER] + [REFRESH EVERY ] + AS +/// DROP [SHARED] NAMED SCALAR [IF EXISTS] name [ON CLUSTER cluster] +/// +/// SHARED rejects ON CLUSTER (the Keeper coordinator broadcasts to all +/// replicas already). The REFRESH grammar is fixed at `EVERY ` +/// where unit is SECOND/MINUTE/HOUR/DAY (with optional plural). +class ParserNamedScalarDDLQuery : public IParserBase +{ +protected: + const char * getName() const override { return "NAMED SCALAR DDL query"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; +}; + +} diff --git a/src/Parsers/ParserQuery.cpp b/src/Parsers/ParserQuery.cpp index 5d65cbc9dd93..d8e4059bd358 100644 --- a/src/Parsers/ParserQuery.cpp +++ b/src/Parsers/ParserQuery.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -62,6 +63,7 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserCreateSettingsProfileQuery create_settings_profile_p; ParserCreateFunctionQuery create_function_p; ParserDropFunctionQuery drop_function_p; + ParserNamedScalarDDLQuery named_scalar_ddl_p; ParserCreateWorkloadQuery create_workload_p; ParserDropWorkloadQuery drop_workload_p; ParserCreateResourceQuery create_resource_p; @@ -95,6 +97,7 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) || create_settings_profile_p.parse(pos, node, expected) || create_function_p.parse(pos, node, expected) || drop_function_p.parse(pos, node, expected) + || named_scalar_ddl_p.parse(pos, node, expected) || create_workload_p.parse(pos, node, expected) || drop_workload_p.parse(pos, node, expected) || create_resource_p.parse(pos, node, expected) diff --git a/src/Parsers/ParserSystemQuery.cpp b/src/Parsers/ParserSystemQuery.cpp index 29ff87ba1afa..dac14950fe00 100644 --- a/src/Parsers/ParserSystemQuery.cpp +++ b/src/Parsers/ParserSystemQuery.cpp @@ -571,6 +571,28 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected & return false; break; + case Type::REFRESH_NAMED_SCALAR: + { + ASTPtr name_ast; + ParserIdentifier id_parser; + if (!id_parser.parse(pos, name_ast, expected)) + return false; + if (!tryGetIdentifierNameInto(name_ast, res->named_scalar_name) || res->named_scalar_name.empty()) + return false; + break; + } + + case Type::START_NAMED_SCALAR_REFRESHES: + case Type::STOP_NAMED_SCALAR_REFRESHES: + { + /// Optional name; absent means "all scalars of the matching scope". + ASTPtr name_ast; + ParserIdentifier id_parser; + if (id_parser.parse(pos, name_ast, expected)) + tryGetIdentifierNameInto(name_ast, res->named_scalar_name); + break; + } + case Type::START_VIEWS: case Type::STOP_VIEWS: case Type::FREE_MEMORY: diff --git a/src/Storages/System/StorageSystemNamedScalars.cpp b/src/Storages/System/StorageSystemNamedScalars.cpp new file mode 100644 index 000000000000..e12a41cea398 --- /dev/null +++ b/src/Storages/System/StorageSystemNamedScalars.cpp @@ -0,0 +1,164 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace +{ +DataTypePtr makeKindEnum() +{ + return std::make_shared(DataTypeEnum8::Values{ + {"local", 0}, + {"shared", 1}, + }); +} +} + +StorageSystemNamedScalars::StorageSystemNamedScalars(const StorageID & storage_id_, ColumnsDescription columns_description_) + : IStorageSystemOneBlock(storage_id_, std::move(columns_description_)) +{ +} + +ColumnsDescription StorageSystemNamedScalars::getColumnsDescription() +{ + /// Two-tier disclosure: + /// * Value tier (getNamedScalar grant) - name, value, freshness. + /// * Operator tier (SHOW_NAMED_SCALARS) - body, errors, definer. + /// Nullable so the value-only viewer sees NULL, not partial truth. + return ColumnsDescription + { + {"name", std::make_shared(), "Named scalar name."}, + {"kind", makeKindEnum(), "Named scalar kind: local or shared."}, + {"value", std::make_shared(std::make_shared()), "Last known value as string, NULL if missing."}, + {"loading_start_time", std::make_shared(), "Time when the named scalar definition was loaded into memory."}, + {"last_refresh_time", std::make_shared(std::make_shared()), "Time of the last refresh attempt."}, + {"next_refresh_time", std::make_shared(std::make_shared()), "Next scheduled refresh time."}, + {"last_success_time", std::make_shared(std::make_shared()), "Time of the last successful refresh."}, + {"refresh_interval", std::make_shared(std::make_shared()), "Refresh interval in seconds."}, + {"type", std::make_shared(std::make_shared()), "Type of the current named scalar value."}, + {"has_value", std::make_shared(), "Whether a last-good value exists."}, + {"current_value_is_valid", std::make_shared(), "Whether the last refresh attempt succeeded."}, + /// --- Operator tier (SHOW_NAMED_SCALARS): NULL when the reader has only `getNamedScalar`. + {"last_refresh_hostname", std::make_shared(std::make_shared()), "Hostname of the last refresher."}, + {"definer", std::make_shared(std::make_shared()), "User the refresh runs as (SQL SECURITY DEFINER)."}, + {"expression", std::make_shared(std::make_shared()), "Named scalar definition expression."}, + {"exception", std::make_shared(std::make_shared()), "Last refresh error message (with error type prefix), if any."}, + {"refresh_in_flight", std::make_shared(std::make_shared()), "1 if a refresh body is currently executing for this scalar."}, + {"refresh_started_at", std::make_shared(std::make_shared()), "Wall-clock time the in-flight refresh started; NULL if not refreshing."}, + {"consecutive_failures", std::make_shared(std::make_shared()), "Number of consecutive failed refreshes since the last successful one."}, + }; +} + +void StorageSystemNamedScalars::fillData( + MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector) const +{ + const auto access = context->getAccess(); + const bool show_full = access->isGranted(AccessType::SHOW_NAMED_SCALARS); + const bool show_value_only = access->isGranted(AccessType::getNamedScalar); + if (!show_full && !show_value_only) + return; + + for (const auto & scoped_scalar : context->getNamedScalarsManager().listScalars()) + { + const auto & scalar = scoped_scalar.scalar; + const auto status = scalar->getInfo(); + + size_t col = 0; + // Value tier + res_columns[col++]->insert(scalar->getName()); + res_columns[col++]->insert(static_cast(scoped_scalar.cache_kind == NamedScalarCacheKind::Shared ? 1 : 0)); + + if (status.value) + res_columns[col++]->insert(applyVisitor(FieldVisitorToString(), status.value->value)); + else + res_columns[col++]->insertDefault(); + + res_columns[col++]->insert(static_cast(std::chrono::system_clock::to_time_t(status.loading_start_time))); + + if (status.last_refresh_time) + res_columns[col++]->insert(static_cast(std::chrono::system_clock::to_time_t(*status.last_refresh_time))); + else + res_columns[col++]->insertDefault(); + + if (status.refresh.next_refresh_time) + res_columns[col++]->insert(static_cast(std::chrono::system_clock::to_time_t(*status.refresh.next_refresh_time))); + else + res_columns[col++]->insertDefault(); + + if (status.last_success_time) + res_columns[col++]->insert(static_cast(std::chrono::system_clock::to_time_t(*status.last_success_time))); + else + res_columns[col++]->insertDefault(); + + if (status.refresh.refreshable) + res_columns[col++]->insert(status.refresh.period_seconds); + else + res_columns[col++]->insertDefault(); + + if (status.value && status.value->type) + res_columns[col++]->insert(status.value->type->getName()); + else + res_columns[col++]->insertDefault(); + + res_columns[col++]->insert(static_cast(status.value.has_value())); + res_columns[col++]->insert(static_cast(status.value && status.value->is_valid)); + + // Operator tier - NULL when only the value-tier grant is present. + if (show_full && !status.last_refresh_hostname.empty()) + res_columns[col++]->insert(status.last_refresh_hostname); + else + res_columns[col++]->insertDefault(); + + if (show_full && !status.definer.empty()) + res_columns[col++]->insert(status.definer); + else + res_columns[col++]->insertDefault(); + + if (show_full && status.expression) + res_columns[col++]->insert(format({context, *status.expression})); + else + res_columns[col++]->insertDefault(); + + /// Embed the type as a "[CODE]: " prefix so a single Nullable column + /// covers what view_refreshes calls `exception`. + if (show_full && !status.last_error.empty()) + { + String exception_text = status.last_error_type.empty() + ? status.last_error + : "[" + status.last_error_type + "]: " + status.last_error; + res_columns[col++]->insert(exception_text); + } + else + res_columns[col++]->insertDefault(); + + if (show_full) + res_columns[col++]->insert(static_cast(status.refresh.refresh_started_at.has_value())); + else + res_columns[col++]->insertDefault(); + + if (show_full && status.refresh.refresh_started_at) + res_columns[col++]->insert(static_cast(std::chrono::system_clock::to_time_t(*status.refresh.refresh_started_at))); + else + res_columns[col++]->insertDefault(); + + if (show_full) + res_columns[col++]->insert(status.refresh.consecutive_failures); + else + res_columns[col++]->insertDefault(); + } +} + +} diff --git a/src/Storages/System/StorageSystemNamedScalars.h b/src/Storages/System/StorageSystemNamedScalars.h new file mode 100644 index 000000000000..09771ebbfd5c --- /dev/null +++ b/src/Storages/System/StorageSystemNamedScalars.h @@ -0,0 +1,21 @@ +#pragma once + +#include + +namespace DB +{ + +class StorageSystemNamedScalars final : public IStorageSystemOneBlock +{ +public: + StorageSystemNamedScalars(const StorageID & storage_id_, ColumnsDescription columns_description_); + + std::string getName() const override { return "SystemNamedScalars"; } + + static ColumnsDescription getColumnsDescription(); + +protected: + void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector) const override; +}; + +} diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index e9a8672c106e..321e2385100a 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -237,6 +238,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b attach(context, system_database, "distributed_ddl_queue", "Contains information about distributed DDL queries (ON CLUSTER clause) that were executed on a cluster."); attach(context, system_database, "distribution_queue", "Contains information about local files that are in the queue to be sent to the shards. These local files contain new parts that are created by inserting new data into the Distributed table in asynchronous mode."); attach(context, system_database, "dictionaries", "Contains information about dictionaries."); + attach(context, system_database, "named_scalars", "Contains information about named scalars."); attach(context, system_database, "models", "Contains a list of CatBoost models loaded into a LibraryBridge's memory along with time when it was loaded."); attach(context, system_database, "clusters", "Contains information about clusters defined in the configuration file or generated by a Replicated database."); attach(context, system_database, "graphite_retentions", "Contains information about parameters graphite_rollup which are used in tables with *GraphiteMergeTree engines."); diff --git a/tests/integration/test_shared_named_scalars_cluster/__init__.py b/tests/integration/test_shared_named_scalars_cluster/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_shared_named_scalars_cluster/configs/config.xml b/tests/integration/test_shared_named_scalars_cluster/configs/config.xml new file mode 100644 index 000000000000..0adb11b39b65 --- /dev/null +++ b/tests/integration/test_shared_named_scalars_cluster/configs/config.xml @@ -0,0 +1,3 @@ + + /clickhouse/shared_named_scalars_test + diff --git a/tests/integration/test_shared_named_scalars_cluster/configs/config_no_zk_path.xml b/tests/integration/test_shared_named_scalars_cluster/configs/config_no_zk_path.xml new file mode 100644 index 000000000000..b8f61d435f4e --- /dev/null +++ b/tests/integration/test_shared_named_scalars_cluster/configs/config_no_zk_path.xml @@ -0,0 +1,3 @@ + + + diff --git a/tests/integration/test_shared_named_scalars_cluster/configs/users.xml b/tests/integration/test_shared_named_scalars_cluster/configs/users.xml new file mode 100644 index 000000000000..239c0db976e6 --- /dev/null +++ b/tests/integration/test_shared_named_scalars_cluster/configs/users.xml @@ -0,0 +1,13 @@ + + + + 1 + 1 + + + + + default + + + diff --git a/tests/integration/test_shared_named_scalars_cluster/test.py b/tests/integration/test_shared_named_scalars_cluster/test.py new file mode 100644 index 000000000000..15463e8b0d05 --- /dev/null +++ b/tests/integration/test_shared_named_scalars_cluster/test.py @@ -0,0 +1,841 @@ +import threading +import time + +import pytest + +from helpers.client import QueryRuntimeException +from helpers.cluster import ClickHouseCluster, ZOOKEEPER_CONTAINERS +from helpers.network import PartitionManager +from helpers.test_tools import assert_eq_with_retry + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance( + "node1", + main_configs=["configs/config.xml"], + user_configs=["configs/users.xml"], + with_zookeeper=True, + stay_alive=True, +) +node2 = cluster.add_instance( + "node2", + main_configs=["configs/config.xml"], + user_configs=["configs/users.xml"], + with_zookeeper=True, + stay_alive=True, +) +# Third node without the shared-named_scalars ZK path — used to assert that +# cluster DDL errors cleanly when the config is missing. +node_no_zk = cluster.add_instance( + "node_no_zk", + main_configs=["configs/config_no_zk_path.xml"], + user_configs=["configs/users.xml"], + with_zookeeper=True, +) + +nodes = [node1, node2] + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def _drop_all_shared_named_scalars(node): + rows = node.query( + "SELECT name FROM system.named_scalars WHERE kind = 'shared'" + ).strip() + for name in [r for r in rows.splitlines() if r]: + node.query(f"DROP NAMED SCALAR IF EXISTS {name}") + + +@pytest.fixture +def cleanup(started_cluster): + yield + # Best-effort cleanup from every node that might see the entry; + # propagation between tests should not leak state. + for node in nodes: + try: + _drop_all_shared_named_scalars(node) + except Exception: + pass + + +# -------- Regression locks: already green after step 2 -------- + +def test_zk_config_required(started_cluster, cleanup): + with pytest.raises(QueryRuntimeException) as exc: + node_no_zk.query("CREATE SHARED NAMED SCALAR nope AS SELECT 1") + err = str(exc.value) + assert "named_scalar_definitions_zookeeper_path" in err + # Round-15 split the error-code surface: the dedicated + # SHARED_NAMED_SCALARS_NOT_CONFIGURED (code 768) replaces the generic + # BAD_ARGUMENTS (36) for this path. Accept either to keep the test + # backward-compatible with builds that still throw the old code. + assert ( + "SHARED_NAMED_SCALARS_NOT_CONFIGURED" in err + or "768" in err + or "BAD_ARGUMENTS" in err + or "36" in err + ) + + +def test_single_node_create_read_drop(started_cluster, cleanup): + node1.query("CREATE SHARED NAMED SCALAR sn_foo AS SELECT toUInt64(7)") + assert node1.query("SELECT getNamedScalar('sn_foo')").strip() == "7" + node1.query("DROP NAMED SCALAR sn_foo") + err = node1.query_and_get_error("SELECT getNamedScalar('sn_foo')") + assert "NAMED_SCALAR_NOT_FOUND" in err + + +# -------- Step 3: coordinator + watches -------- + +def test_cross_node_discovery(started_cluster, cleanup): + node1.query("CREATE SHARED NAMED SCALAR xn_foo AS SELECT toUInt64(42)") + assert_eq_with_retry( + node2, + "SELECT getNamedScalar('xn_foo')", + "42\n", + retry_count=40, + sleep_time=0.25, + ) + + +def test_cross_node_drop(started_cluster, cleanup): + node1.query("CREATE SHARED NAMED SCALAR xd_foo AS SELECT toUInt64(1)") + assert_eq_with_retry(node2, "SELECT getNamedScalar('xd_foo')", "1\n") + + node1.query("DROP NAMED SCALAR xd_foo") + + def no_longer_exists(): + err = node2.query_and_get_error("SELECT getNamedScalar('xd_foo')") + return "NAMED_SCALAR_NOT_FOUND" in err + + deadline = time.time() + 10 + while time.time() < deadline: + if no_longer_exists(): + return + time.sleep(0.25) + pytest.fail("node2 still sees replicated xd_foo after DROP on node1") + + +def test_restart_picks_up_existing(started_cluster, cleanup): + node1.query("CREATE SHARED NAMED SCALAR rs_foo AS SELECT toUInt64(100)") + assert_eq_with_retry(node2, "SELECT getNamedScalar('rs_foo')", "100\n") + + node2.stop_clickhouse() + node2.start_clickhouse() + + # On boot, node2 must load from ZK — no watch event to rely on. + assert_eq_with_retry( + node2, + "SELECT getNamedScalar('rs_foo')", + "100\n", + retry_count=40, + sleep_time=0.25, + ) + + +def test_create_or_replace_propagates(started_cluster, cleanup): + node1.query("CREATE SHARED NAMED SCALAR repl_v AS SELECT toUInt64(1)") + assert_eq_with_retry(node2, "SELECT getNamedScalar('repl_v')", "1\n") + + node1.query("CREATE OR REPLACE SHARED NAMED SCALAR repl_v AS SELECT toUInt64(2)") + assert_eq_with_retry(node2, "SELECT getNamedScalar('repl_v')", "2\n") + + +def test_system_table_sees_cluster_rows(started_cluster, cleanup): + node1.query("CREATE SHARED NAMED SCALAR st_foo AS SELECT toUInt64(5)") + assert_eq_with_retry(node2, "SELECT getNamedScalar('st_foo')", "5\n") + + for node in nodes: + row = node.query( + "SELECT name, kind, value, type FROM system.named_scalars " + "WHERE name = 'st_foo' AND kind = 'shared'" + ).strip() + assert row == "st_foo\tshared\t5\tUInt64", f"{node.name}: {row!r}" + + +# -------- Step 4: leader election + REFRESH -------- + +def test_refresh_clause_accepted(started_cluster, cleanup): + # After step 4 this must stop returning NOT_IMPLEMENTED. + node1.query( + "CREATE SHARED NAMED SCALAR rc_wm REFRESH EVERY 1 SECOND AS SELECT toUInt64(now())" + ) + + +def test_refresh_updates_value(started_cluster, cleanup): + node1.query( + "CREATE SHARED NAMED SCALAR rv_wm REFRESH EVERY 1 SECOND AS SELECT toUInt64(now())" + ) + assert_eq_with_retry( + node2, + "SELECT getNamedScalar('rv_wm') > 0", + "1\n", + retry_count=40, + sleep_time=0.25, + ) + + first = int(node1.query("SELECT getNamedScalar('rv_wm')").strip()) + time.sleep(3) + later_n1 = int(node1.query("SELECT getNamedScalar('rv_wm')").strip()) + later_n2 = int(node2.query("SELECT getNamedScalar('rv_wm')").strip()) + assert later_n1 > first + assert later_n2 > first + + +def test_refresh_single_leader_per_tick(started_cluster, cleanup): + """Every tick exactly one node writes to ZK (ephemeral lock serialises writers). + The winner may alternate between ticks — we just assert every observed + hostname is one of the real cluster members.""" + node1.query( + "CREATE SHARED NAMED SCALAR sl_wm REFRESH EVERY 1 SECOND AS SELECT toUInt64(now())" + ) + assert_eq_with_retry(node2, "SELECT getNamedScalar('sl_wm') > 0", "1\n") + + valid_hosts = {node1.hostname, node2.hostname} + samples = [] + for _ in range(8): + host = node1.query( + "SELECT last_refresh_hostname FROM system.named_scalars " + "WHERE name = 'sl_wm' AND kind = 'shared'" + ).strip() + samples.append(host) + time.sleep(1) + + assert all(h in valid_hosts for h in samples), (samples, valid_hosts) + + +def test_system_refresh_variable(started_cluster, cleanup): + node1.query( + "CREATE SHARED NAMED SCALAR sr_wm REFRESH EVERY 36500 DAYS AS SELECT toUInt64(now())" + ) + assert_eq_with_retry(node2, "SELECT getNamedScalar('sr_wm') > 0", "1\n") + first = int(node2.query("SELECT getNamedScalar('sr_wm')").strip()) + time.sleep(1) + node1.query("SYSTEM REFRESH NAMED SCALAR sr_wm") + assert_eq_with_retry( + node2, + f"SELECT getNamedScalar('sr_wm') > {first}", + "1\n", + retry_count=40, + sleep_time=0.25, + ) + + +def test_refreshable_create_publishes_initialized_entry(started_cluster, cleanup): + """CREATE pauses after in-memory publish. SYSTEM REFRESH during that pause + must not report "not refreshable".""" + + errors = [] + saw_refresh_success = False + last_refresh_error = "" + + def create_variable(): + try: + node1.query( + "CREATE SHARED NAMED SCALAR init_pub REFRESH EVERY 36500 DAYS AS SELECT toUInt64(now())" + ) + except Exception as exc: + errors.append(exc) + + node1.query("SYSTEM ENABLE FAILPOINT named_scalar_create_after_publish_pause") + creator = threading.Thread(target=create_variable) + creator.start() + try: + deadline = time.time() + 30 + while time.time() < deadline: + if errors: + break + + try: + node1.query("SYSTEM REFRESH NAMED SCALAR init_pub") + saw_refresh_success = True + break + except Exception as exc: + message = str(exc) + last_refresh_error = message + if "not refreshable" in message: + raise + if "NAMED_SCALAR_NOT_FOUND" not in message: + # Keep polling while CREATE is still wiring the entry. + pass + + time.sleep(0.1) + finally: + node1.query("SYSTEM DISABLE FAILPOINT named_scalar_create_after_publish_pause") + creator.join(timeout=30) + + assert not creator.is_alive() + if errors: + raise errors[0] + assert saw_refresh_success, last_refresh_error or "SYSTEM REFRESH NAMED SCALAR never succeeded during paused CREATE" + + assert_eq_with_retry( + node2, + "SELECT getNamedScalar('init_pub') > 0", + "1\n", + retry_count=40, + sleep_time=0.25, + ) + + +def test_refresh_failover(started_cluster, cleanup): + node1.query( + "CREATE SHARED NAMED SCALAR fo_wm REFRESH EVERY 2 SECOND AS SELECT toUInt64(now())" + ) + assert_eq_with_retry(node2, "SELECT getNamedScalar('fo_wm') > 0", "1\n") + + node1.stop_clickhouse(kill=True) + try: + # Wait out the ephemeral-lock holder's ZK session. Default session timeout + # for Kazoo-style helpers in this repo is a few seconds; budget generously. + deadline = time.time() + 60 + seen = None + while time.time() < deadline: + v = node2.query("SELECT getNamedScalar('fo_wm')").strip() + if seen is None: + seen = v + elif v != seen: + return # value moved; some replica is refreshing now + time.sleep(1) + pytest.fail("no refresh observed on node2 after node1 was killed") + finally: + node1.start_clickhouse() + + +# -------- Pass 2: races, ZK disruption, discovery churn -------- + + +def test_duplicate_create_cluster_race(started_cluster, cleanup): + """Two nodes CREATE the same shared scalar concurrently. + Exactly one succeeds; the other sees NAMED_SCALAR_ALREADY_EXISTS. + Both end up seeing the winner's value.""" + + results = {} + + def create(node, tag): + try: + node.query( + f"CREATE SHARED NAMED SCALAR race_cv AS SELECT toUInt64({tag})" + ) + results[tag] = "ok" + except Exception as exc: + results[tag] = str(exc) + + t1 = threading.Thread(target=create, args=(node1, 1)) + t2 = threading.Thread(target=create, args=(node2, 2)) + t1.start() + t2.start() + t1.join() + t2.join() + + outcomes = list(results.values()) + ok_count = sum(1 for o in outcomes if o == "ok") + # The loser either hits NAMED_SCALAR_ALREADY_EXISTS (if the winner's ZK create + # landed first) or the per-variable publish lock (if the winner is still + # writing definition + value atomically). Either outcome proves the two + # CREATEs were serialised. + err_count = sum( + 1 for o in outcomes + if "NAMED_SCALAR_ALREADY_EXISTS" in o + or "currently creating or refreshing" in o + ) + assert ok_count == 1 and err_count == 1, results + + winner_tag = next(t for t, o in results.items() if o == "ok") + expected = str(winner_tag) + for node in nodes: + assert_eq_with_retry( + node, + "SELECT getNamedScalar('race_cv')", + expected + "\n", + retry_count=40, + sleep_time=0.25, + ) + + +def test_create_or_replace_while_refreshing(started_cluster, cleanup): + """OR REPLACE in the middle of a refresh cycle must converge both nodes + on the new value and drop the REFRESH schedule.""" + + node1.query( + "CREATE SHARED NAMED SCALAR repl_race REFRESH EVERY 1 SECOND AS SELECT toUInt64(now())" + ) + assert_eq_with_retry(node2, "SELECT getNamedScalar('repl_race') > 0", "1\n") + + # Let several ticks happen so the scheduler is genuinely active. + time.sleep(2) + + node2.query( + "CREATE OR REPLACE SHARED NAMED SCALAR repl_race AS SELECT toUInt64(42)" + ) + + for node in nodes: + assert_eq_with_retry( + node, + "SELECT getNamedScalar('repl_race')", + "42\n", + retry_count=40, + sleep_time=0.25, + ) + + # Stable at 42 across a window that would have covered several refresh ticks + # if the schedule had survived. + time.sleep(3) + for node in nodes: + assert node.query( + "SELECT getNamedScalar('repl_race')" + ).strip() == "42" + + +def test_create_cluster_rolls_back_on_value_store_failure(started_cluster, cleanup): + """If initial value publication fails, CREATE must fail and leave no + definition-only ghost variable in Keeper.""" + + node1.query("SYSTEM ENABLE FAILPOINT shared_named_scalars_store_value_fail_once") + try: + err = node1.query_and_get_error( + "CREATE SHARED NAMED SCALAR publish_fail_cv AS SELECT toUInt64(11)" + ) + finally: + # ONCE failpoints auto-disable after trigger, but disable explicitly in + # case CREATE failed before reaching the injection point. + node1.query("SYSTEM DISABLE FAILPOINT shared_named_scalars_store_value_fail_once") + + assert ( + "Injected failure while storing shared scalar" in err + or "KEEPER_EXCEPTION" in err + ), err + + for node in nodes: + + def missing(n=node): + error = n.query_and_get_error( + "SELECT getNamedScalar('publish_fail_cv')" + ) + return "NAMED_SCALAR_NOT_FOUND" in error + + deadline = time.time() + 10 + while time.time() < deadline and not missing(): + time.sleep(0.25) + assert missing(), f"{node.name} still sees replicated publish_fail_cv" + + node1.query("CREATE SHARED NAMED SCALAR publish_fail_cv AS SELECT toUInt64(11)") + for node in nodes: + assert_eq_with_retry( + node, + "SELECT getNamedScalar('publish_fail_cv')", + "11\n", + retry_count=40, + sleep_time=0.25, + ) + + +def test_cluster_refresh_failure_flap(started_cluster, cleanup): + """Refresh fails on the leader after its dependency is dropped; the last-good + value + last_error propagate via ZK to the peer; recovery clears the error.""" + + for node in nodes: + node.query("DROP TABLE IF EXISTS default.flap_src SYNC") + node.query("CREATE TABLE default.flap_src (x UInt8) ENGINE = Memory") + node.query("INSERT INTO default.flap_src VALUES (42)") + + node1.query( + "CREATE SHARED NAMED SCALAR flap_cv REFRESH EVERY 1 SECOND " + "AS (SELECT max(x) FROM default.flap_src)" + ) + assert_eq_with_retry(node1, "SELECT getNamedScalar('flap_cv')", "42\n") + assert_eq_with_retry(node2, "SELECT getNamedScalar('flap_cv')", "42\n") + + for node in nodes: + node.query("DROP TABLE default.flap_src SYNC") + + def is_valid_flipped(node): + row = node.query( + "SELECT has_value, current_value_is_valid, coalesce(exception,'') != '' " + "FROM system.named_scalars " + "WHERE name = 'flap_cv' AND kind = 'shared'" + ).strip() + return row == "1\t0\t1" + + deadline = time.time() + 30 + while time.time() < deadline: + if all(is_valid_flipped(n) for n in nodes): + break + time.sleep(0.5) + else: + pytest.fail( + "flap did not propagate: " + + "; ".join( + f"{n.name}: " + + n.query( + "SELECT has_value, current_value_is_valid, exception " + "FROM system.named_scalars WHERE name='flap_cv'" + ).strip() + for n in nodes + ) + ) + + # Last-good value is still visible on both nodes. + for node in nodes: + assert node.query( + "SELECT getNamedScalar('flap_cv')" + ).strip() == "42" + + # Recovery: recreate table + data, expect current_value_is_valid back to 1 within a few ticks. + for node in nodes: + node.query("CREATE TABLE default.flap_src (x UInt8) ENGINE = Memory") + node.query("INSERT INTO default.flap_src VALUES (7)") + + def recovered(node): + row = node.query( + "SELECT current_value_is_valid, coalesce(exception,'') = '' " + "FROM system.named_scalars " + "WHERE name = 'flap_cv' AND kind = 'shared'" + ).strip() + return row == "1\t1" + + deadline = time.time() + 30 + while time.time() < deadline: + if all(recovered(n) for n in nodes): + for node in nodes: + node.query("DROP TABLE IF EXISTS default.flap_src SYNC") + return + time.sleep(0.5) + pytest.fail("recovery did not happen in time") + +def test_zk_disconnect_reads_continue(started_cluster, cleanup): + """Partition node2 from ZK. getNamedScalar on node2 must keep serving the + cached value. After the partition heals, a new write from node1 reaches + node2 again.""" + + node1.query( + "CREATE SHARED NAMED SCALAR zkd_cv AS SELECT toUInt64(100)" + ) + assert_eq_with_retry(node2, "SELECT getNamedScalar('zkd_cv')", "100\n") + + with PartitionManager() as pm: + pm.drop_instance_zk_connections(node2) + # Cached reads keep working even while ZK is unreachable. + for _ in range(5): + assert node2.query( + "SELECT getNamedScalar('zkd_cv')" + ).strip() == "100" + time.sleep(0.5) + + # After partition heals, a new write on node1 should propagate. + node1.query( + "CREATE OR REPLACE SHARED NAMED SCALAR zkd_cv AS SELECT toUInt64(101)" + ) + assert_eq_with_retry( + node2, + "SELECT getNamedScalar('zkd_cv')", + "101\n", + retry_count=80, + sleep_time=0.25, + ) + + +def test_zk_session_loss_re_enumerate(started_cluster, cleanup): + """Stop Keeper long enough for the ZK session to expire, restart it, and + assert the coordinator on both nodes re-enumerates — a fresh CREATE on + node1 must reach node2 without a node restart.""" + + node1.query( + "CREATE SHARED NAMED SCALAR sess_cv AS SELECT toUInt64(1)" + ) + assert_eq_with_retry(node2, "SELECT getNamedScalar('sess_cv')", "1\n") + + cluster.stop_zookeeper_nodes(ZOOKEEPER_CONTAINERS) + # Session timeout for the integration fixture is ~30 s; wait past it. + time.sleep(35) + cluster.start_zookeeper_nodes(ZOOKEEPER_CONTAINERS) + + # After the session returns, the coordinator should reconnect and a new + # CREATE on node1 must reach node2. + node1.query_with_retry( + "CREATE OR REPLACE SHARED NAMED SCALAR sess_cv AS SELECT toUInt64(2)", + retry_count=40, + sleep_time=1.0, + ) + assert_eq_with_retry( + node2, + "SELECT getNamedScalar('sess_cv')", + "2\n", + retry_count=60, + sleep_time=1.0, + ) + + +def test_restart_during_refresh_no_leak(started_cluster, cleanup): + """Kill the node most likely to hold the ephemeral refresh lock; after + restart it should rejoin cleanly and the variable's value should keep + advancing without any node getting stuck on a stale hostname.""" + + node1.query( + "CREATE SHARED NAMED SCALAR rlk_cv REFRESH EVERY 1 SECOND AS SELECT toUInt64(now())" + ) + assert_eq_with_retry(node2, "SELECT getNamedScalar('rlk_cv') > 0", "1\n") + + node1.stop_clickhouse(kill=True) + try: + time.sleep(3) + # While node1 is down, node2 must keep refreshing. + before = int(node2.query("SELECT getNamedScalar('rlk_cv')").strip()) + time.sleep(3) + after = int(node2.query("SELECT getNamedScalar('rlk_cv')").strip()) + assert after > before, (before, after) + finally: + node1.start_clickhouse() + + # After restart node1 sees the variable and the value continues to advance. + assert_eq_with_retry(node1, "SELECT getNamedScalar('rlk_cv') > 0", "1\n") + v1 = int(node1.query("SELECT getNamedScalar('rlk_cv')").strip()) + time.sleep(3) + v2 = int(node1.query("SELECT getNamedScalar('rlk_cv')").strip()) + assert v2 > v1, (v1, v2) + + +def test_or_replace_drops_refresh_schedule_on_peer(started_cluster, cleanup): + """Reported by codex review. The peer's coordinator fast-path skipped rebuilds + when the AST hash of the expression matched. If OR REPLACE only removed the + REFRESH clause (keeping the same expression), the peer kept its old scheduler + alive and continued publishing refreshed values via ZK, violating the new + definition that says 'no refresh'.""" + + node1.query( + "CREATE SHARED NAMED SCALAR rs_strip REFRESH EVERY 1 SECOND AS SELECT toUInt64(now())" + ) + assert_eq_with_retry(node2, "SELECT getNamedScalar('rs_strip') > 0", "1\n") + + # Let ticks run on both sides so both nodes have a live refresh scheduler. + time.sleep(2) + + # Same expression, REFRESH removed. Hash of expression is unchanged — the + # fast-path used to short-circuit and leave the old scheduler alive. + node1.query( + "CREATE OR REPLACE SHARED NAMED SCALAR rs_strip AS SELECT toUInt64(now())" + ) + + # Give any lingering scheduler ticks a chance to fire. + time.sleep(3) + + # After OR REPLACE the value is whatever CREATE evaluated once — not a + # moving target. Sample on each node across another window; value must be + # identical on each sample. + for node in nodes: + snapshots = [] + for _ in range(4): + snapshots.append(node.query("SELECT getNamedScalar('rs_strip')").strip()) + time.sleep(1) + assert len(set(snapshots)) == 1, (node.name, snapshots) + + # system.named_scalars.refresh_interval is NULL for a non-refreshable var. + for node in nodes: + row = node.query( + "SELECT refresh_interval IS NULL FROM system.named_scalars " + "WHERE name = 'rs_strip' AND kind = 'shared'" + ).strip() + assert row == "1", (node.name, row) + + +def test_drop_while_discovery_in_flight(started_cluster, cleanup): + """On node1, create a batch of shared named_scalars; simultaneously drop one + of them from node2. All other entries must eventually be visible on node2 + and the coordinator must keep processing subsequent DDL.""" + + names = [f"batch_cv_{i:02d}" for i in range(20)] + target = names[7] + + # Seed the target first so that DROP on node2 has something to race with. + node1.query(f"CREATE SHARED NAMED SCALAR {target} AS SELECT toUInt64(999)") + assert_eq_with_retry( + node2, f"SELECT getNamedScalar('{target}')", "999\n" + ) + + def create_batch(): + for name in names: + if name == target: + continue + node1.query(f"CREATE SHARED NAMED SCALAR {name} AS SELECT toUInt64(1)") + + def drop_target(): + time.sleep(0.05) # small head start for create_batch + node2.query(f"DROP NAMED SCALAR IF EXISTS {target}") + + t1 = threading.Thread(target=create_batch) + t2 = threading.Thread(target=drop_target) + t1.start() + t2.start() + t1.join() + t2.join() + + # Non-target entries must end up visible on both nodes. + for node in nodes: + for name in names: + if name == target: + continue + assert_eq_with_retry( + node, + f"SELECT getNamedScalar('{name}')", + "1\n", + retry_count=40, + sleep_time=0.25, + ) + + # Target is gone on both nodes. + for node in nodes: + + def target_gone(n=node): + err = n.query_and_get_error( + f"SELECT getNamedScalar('{target}')" + ) + return "NAMED_SCALAR_NOT_FOUND" in err + + deadline = time.time() + 10 + while time.time() < deadline and not target_gone(): + time.sleep(0.25) + assert target_gone(), ( + f"{node.name} still sees replicated {target} after DROP" + ) + + # Coordinator is still alive: further DDL goes through. + node1.query("CREATE SHARED NAMED SCALAR batch_post AS SELECT toUInt64(1)") + assert_eq_with_retry( + node2, "SELECT getNamedScalar('batch_post')", "1\n" + ) + + +# -------- LOCAL persistence (replaces stateless 03802 / 03807 / 03810) -------- +# These properties used to live in stateless tests driven by clickhouse-local, +# but timing-coupled assertions against a short-lived local instance are +# fragile under CI load. Real stop_clickhouse() / start_clickhouse() with +# assert_eq_with_retry is the proper harness. + + +def _drop_all_local_named_scalars(node): + rows = node.query( + "SELECT name FROM system.named_scalars WHERE kind = 'local'" + ).strip() + for name in [r for r in rows.splitlines() if r]: + node.query(f"DROP NAMED SCALAR IF EXISTS {name}") + + +@pytest.fixture +def cleanup_local(): + yield + try: + _drop_all_local_named_scalars(node1) + except Exception: + pass + + +def test_local_restart_reloads_state(started_cluster, cleanup_local): + """LOCAL scalar definitions and last-good values are loaded synchronously + during server startup, and the refresh task resumes ticking afterwards.""" + + node1.query("DROP NAMED SCALAR IF EXISTS lr_cv") + node1.query("DROP TABLE IF EXISTS default.lr_src SYNC") + node1.query("CREATE TABLE default.lr_src (x UInt64) ENGINE=MergeTree() ORDER BY x") + node1.query("INSERT INTO default.lr_src VALUES (123)") + node1.query( + "CREATE NAMED SCALAR lr_cv REFRESH EVERY 1 SECOND " + "AS (SELECT max(x) FROM default.lr_src)" + ) + assert node1.query("SELECT getNamedScalar('lr_cv')").strip() == "123" + + node1.stop_clickhouse() + node1.start_clickhouse() + + # Reload is sync: definition + last-good value visible immediately. + assert node1.query("SELECT getNamedScalar('lr_cv')").strip() == "123" + + # Refresh task resumes; advance the value and observe the new one. + node1.query("INSERT INTO default.lr_src VALUES (200)") + assert_eq_with_retry( + node1, "SELECT getNamedScalar('lr_cv')", "200\n", + retry_count=60, sleep_time=0.25, + ) + + node1.query("DROP TABLE default.lr_src SYNC") + + +def test_persistent_cadence_resumes_schedule(started_cluster, cleanup_local): + """Refresh schedule resumes from the persisted last_successful_update_time + instead of the restart moment. Without the fix, EVERY 1 HOUR would defer + the next tick by ~1 hour even if it was almost due.""" + + node1.query("DROP NAMED SCALAR IF EXISTS pc_cv") + node1.query( + "CREATE NAMED SCALAR pc_cv REFRESH EVERY 1 HOUR AS SELECT toUInt64(1)" + ) + + node1.stop_clickhouse() + + # Rewrite the persisted timestamps to "3570 seconds ago" so the next tick + # under EVERY 1 HOUR should land ~30 s after restart. + rewrite_script = ( + "import os, re, time\n" + "root = '/var/lib/clickhouse/named_scalars_cache'\n" + "for f in os.listdir(root):\n" + " p = os.path.join(root, f)\n" + " t = open(p).read()\n" + " target = int(time.time()) - 3570\n" + " t = re.sub(r'^last_update_time: \\d+$', " + "f'last_update_time: {target}', t, count=1, flags=re.MULTILINE)\n" + " t = re.sub(r'^last_successful_update_time: \\d+$', " + "f'last_successful_update_time: {target}', t, count=1, flags=re.MULTILINE)\n" + " open(p, 'w').write(t)\n" + ) + node1.exec_in_container(["python3", "-c", rewrite_script], user="root") + + node1.start_clickhouse() + + # Next refresh should land within ~5 minutes; anything close to 3600 + # would mean the schedule reset to "now". + next_in = int(node1.query( + "SELECT toInt64(next_refresh_time) - toInt64(now()) " + "FROM system.named_scalars WHERE name = 'pc_cv' AND kind = 'local'" + ).strip()) + assert next_in < 300, f"next_refresh_time is {next_in} s away (expected < 300)" + + +def test_creator_database_normalization(started_cluster, cleanup_local): + """Unqualified table refs in the CREATE NAMED SCALAR body are rewritten + with the creator's current database. After restart, SYSTEM REFRESH from a + process whose default database is `default` still resolves correctly.""" + + node1.query("DROP NAMED SCALAR IF EXISTS cd_cv") + node1.query("DROP DATABASE IF EXISTS cd_db SYNC") + node1.query("CREATE DATABASE cd_db") + node1.query("CREATE TABLE cd_db.t (x UInt64) ENGINE=MergeTree() ORDER BY x") + node1.query("INSERT INTO cd_db.t VALUES (10), (20), (30)") + + # CREATE in cd_db; refer to `t` unqualified. + node1.query( + "CREATE NAMED SCALAR cd_cv REFRESH EVERY 36500 DAYS AS (SELECT sum(x) FROM t)", + database="cd_db", + ) + assert node1.query("SELECT getNamedScalar('cd_cv')").strip() == "60" + + node1.stop_clickhouse() + node1.start_clickhouse() + + # SYSTEM REFRESH from default database; without normalization this would + # try to resolve `t` against `default` and fail with UNKNOWN_TABLE. + node1.query("SYSTEM REFRESH NAMED SCALAR cd_cv") + assert_eq_with_retry( + node1, + "SELECT current_value_is_valid, coalesce(exception, '') = '' " + "FROM system.named_scalars WHERE name = 'cd_cv'", + "1\t1\n", + retry_count=60, sleep_time=0.25, + ) + + node1.query("DROP DATABASE cd_db SYNC") diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index ec8c1b6aa923..5e40f5d97e06 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -68,6 +68,7 @@ CREATE FUNCTION [] GLOBAL CREATE CREATE WORKLOAD [] GLOBAL CREATE CREATE RESOURCE [] GLOBAL CREATE CREATE NAMED COLLECTION [] NAMED_COLLECTION NAMED COLLECTION ADMIN +CREATE NAMED SCALAR [] GLOBAL CREATE CREATE [] \N ALL DROP DATABASE [] DATABASE DROP DROP TABLE [] TABLE DROP @@ -77,6 +78,7 @@ DROP FUNCTION [] GLOBAL DROP DROP WORKLOAD [] GLOBAL DROP DROP RESOURCE [] GLOBAL DROP DROP NAMED COLLECTION [] NAMED_COLLECTION NAMED COLLECTION ADMIN +DROP NAMED SCALAR [] GLOBAL DROP DROP [] \N ALL UNDROP TABLE [] TABLE ALL TRUNCATE ['TRUNCATE TABLE'] TABLE ALL @@ -197,9 +199,13 @@ SYSTEM LOAD PRIMARY KEY ['SYSTEM LOAD PRIMARY KEY'] TABLE SYSTEM SYSTEM UNLOAD PRIMARY KEY ['SYSTEM UNLOAD PRIMARY KEY'] TABLE SYSTEM SYSTEM INSTRUMENT ADD ['SYSTEM INSTRUMENT ADD'] GLOBAL SYSTEM SYSTEM INSTRUMENT REMOVE ['SYSTEM INSTRUMENT REMOVE'] GLOBAL SYSTEM +SYSTEM REFRESH NAMED SCALAR ['SYSTEM REFRESH NAMED SCALAR'] GLOBAL SYSTEM +SYSTEM NAMED SCALAR REFRESHES ['SYSTEM START NAMED SCALAR REFRESHES','SYSTEM STOP NAMED SCALAR REFRESHES'] GLOBAL SYSTEM SYSTEM [] \N ALL dictGet ['dictHas','dictGetHierarchy','dictIsIn'] DICTIONARY ALL displaySecretsInShowAndSelect [] GLOBAL ALL +getNamedScalar ['getNamedScalarOrDefault'] GLOBAL ALL +SHOW NAMED SCALARS [] GLOBAL ALL addressToLine [] GLOBAL INTROSPECTION addressToLineWithInlines [] GLOBAL INTROSPECTION addressToSymbol [] GLOBAL INTROSPECTION diff --git a/tests/queries/0_stateless/03800_named_scalars.reference b/tests/queries/0_stateless/03800_named_scalars.reference new file mode 100644 index 000000000000..3a635767db7d --- /dev/null +++ b/tests/queries/0_stateless/03800_named_scalars.reference @@ -0,0 +1,19 @@ +1 +cv_test local 1 UInt32 1 1 +2 +x +String +x +1 +1 +42 +7 +7 +42 +fallback +1 +11 +0 +7 +1 +99 diff --git a/tests/queries/0_stateless/03800_named_scalars.sql b/tests/queries/0_stateless/03800_named_scalars.sql new file mode 100644 index 000000000000..0b1e9b15d248 --- /dev/null +++ b/tests/queries/0_stateless/03800_named_scalars.sql @@ -0,0 +1,91 @@ +-- Tags: no-parallel +SET allow_experimental_named_scalars = 1; + +DROP NAMED SCALAR IF EXISTS cv_test; +DROP NAMED SCALAR IF EXISTS cv_ref; +DROP NAMED SCALAR IF EXISTS cv_big; +DROP NAMED SCALAR IF EXISTS cv_fixed; +DROP NAMED SCALAR IF EXISTS cv_const; +DROP NAMED SCALAR IF EXISTS cv_local; + +-- Basic CRUD: CREATE, value/type, system table row. +CREATE NAMED SCALAR cv_test AS SELECT toUInt32(1); +SELECT getNamedScalar('cv_test'); +SELECT name, kind, value, type, has_value, current_value_is_valid +FROM system.named_scalars WHERE name = 'cv_test' ORDER BY name; + +-- OR REPLACE swaps cleanly across incompatible types. +CREATE OR REPLACE NAMED SCALAR cv_test AS SELECT toUInt32(2); +SELECT getNamedScalar('cv_test'); +CREATE OR REPLACE NAMED SCALAR cv_test AS SELECT 'x'; +SELECT getNamedScalar('cv_test'); +SELECT toTypeName(getNamedScalar('cv_test')); + +-- Self-reference and SQL SECURITY non-DEFINER are rejected. +CREATE NAMED SCALAR cv_ref AS SELECT getNamedScalar('cv_test'); -- {serverError BAD_ARGUMENTS} +CREATE NAMED SCALAR cv_ref AS SELECT getNamedScalarOrDefault('cv_test', toUInt32(0)); -- {serverError BAD_ARGUMENTS} +CREATE NAMED SCALAR cv_ref SQL SECURITY INVOKER AS SELECT toUInt32(1); -- {serverError BAD_ARGUMENTS} +CREATE NAMED SCALAR cv_ref SQL SECURITY NONE AS SELECT toUInt32(1); -- {serverError BAD_ARGUMENTS} + +-- Missing scalar / duplicate name / IF NOT EXISTS. +SELECT getNamedScalar('missing'); -- {serverError NAMED_SCALAR_NOT_FOUND} +CREATE NAMED SCALAR cv_test AS SELECT toUInt32(3); -- {serverError NAMED_SCALAR_ALREADY_EXISTS} +CREATE NAMED SCALAR IF NOT EXISTS cv_test AS SELECT toUInt32(4); +SELECT getNamedScalar('cv_test'); + +DROP NAMED SCALAR cv_test; +DROP NAMED SCALAR cv_test; -- {serverError NAMED_SCALAR_NOT_FOUND} +DROP NAMED SCALAR IF EXISTS cv_test; + +-- Value-size cap. +CREATE NAMED SCALAR cv_big AS SELECT repeat('x', 2 * 1024 * 1024); -- {serverError TOO_LARGE_STRING_SIZE} + +-- Existence check fires before SELECT eval, so IF NOT EXISTS / duplicate +-- are no-ops / clean errors even when the new SELECT is invalid. +CREATE NAMED SCALAR cv_fixed AS SELECT toUInt32(1); +CREATE NAMED SCALAR IF NOT EXISTS cv_fixed AS (SELECT * FROM nonexistent_table); +SELECT getNamedScalar('cv_fixed'); +CREATE NAMED SCALAR cv_fixed AS (SELECT * FROM nonexistent_table); -- {serverError NAMED_SCALAR_ALREADY_EXISTS} +SELECT getNamedScalar('cv_fixed'); +DROP NAMED SCALAR cv_fixed; + +-- Constant SELECTs are allowed for non-refreshable scalars. +CREATE NAMED SCALAR cv_const AS SELECT 42; +SELECT getNamedScalar('cv_const'); +DROP NAMED SCALAR cv_const; + +-- LOCAL is an explicit cache-kind modifier (equivalent to default). +CREATE LOCAL NAMED SCALAR cv_local AS SELECT toUInt16(7); +SELECT getNamedScalar('cv_local'); +DROP NAMED SCALAR cv_local; + +-- ----- getNamedScalarOrDefault paths (was 03803) ----- +DROP NAMED SCALAR IF EXISTS cv_defined; +CREATE NAMED SCALAR cv_defined AS SELECT toUInt32(7); +SELECT getNamedScalarOrDefault('cv_defined', toUInt32(0)); +SELECT getNamedScalarOrDefault('missing', toUInt32(42)); +SELECT getNamedScalarOrDefault('missing', 'fallback'); +SELECT getNamedScalarOrDefault('missing', NULL) IS NULL; +SELECT getNamedScalarOrDefault('missing', toUInt32(11)); +DROP NAMED SCALAR cv_defined; + +-- ----- CREATE rollback when initial SELECT throws (was 03811) ----- +DROP NAMED SCALAR IF EXISTS cv_rollback; +CREATE NAMED SCALAR cv_rollback AS SELECT throwIf(1, 'eval_failed'); -- {serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO} +-- Catalog must be empty for this name. +SELECT count() FROM system.named_scalars WHERE name = 'cv_rollback'; +SELECT getNamedScalar('cv_rollback'); -- {serverError NAMED_SCALAR_NOT_FOUND} +-- Retry with a working SELECT must succeed (orphan blob from first attempt is cleaned up). +CREATE NAMED SCALAR cv_rollback AS SELECT toUInt32(7); +SELECT getNamedScalar('cv_rollback'); +DROP NAMED SCALAR cv_rollback; + +-- ----- Duplicate CREATE doesn't poison the slot (was 03813) ----- +DROP NAMED SCALAR IF EXISTS cv_orphan; +CREATE NAMED SCALAR cv_orphan AS SELECT toUInt32(1); +CREATE NAMED SCALAR cv_orphan AS SELECT toUInt32(2); -- {serverError NAMED_SCALAR_ALREADY_EXISTS} +SELECT getNamedScalar('cv_orphan'); +DROP NAMED SCALAR cv_orphan; +CREATE NAMED SCALAR cv_orphan AS SELECT toUInt32(99); +SELECT getNamedScalar('cv_orphan'); +DROP NAMED SCALAR cv_orphan; diff --git a/tests/queries/0_stateless/03801_named_scalars_refresh.reference b/tests/queries/0_stateless/03801_named_scalars_refresh.reference new file mode 100644 index 000000000000..c6cee0f42edc --- /dev/null +++ b/tests/queries/0_stateless/03801_named_scalars_refresh.reference @@ -0,0 +1,6 @@ +local cv_refresh 1 1 +1 +1 0 1 1 +1 +1 1 1 +2 diff --git a/tests/queries/0_stateless/03801_named_scalars_refresh.sh b/tests/queries/0_stateless/03801_named_scalars_refresh.sh new file mode 100755 index 000000000000..0a061a8898fc --- /dev/null +++ b/tests/queries/0_stateless/03801_named_scalars_refresh.sh @@ -0,0 +1,93 @@ +#!/usr/bin/env bash +# Tags: no-parallel +# +# Refresh-task lifecycle for local named scalars: scheduled refresh fires; +# missing-table flap preserves the last-good value with current_value_is_valid=0 +# and an exception column populated; recovery clears the error. +# Async refresh => poll system.named_scalars; never sleep on a fixed deadline. + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh +CLICKHOUSE_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_named_scalars=1" + +# Poll a query until it returns the expected output (one or more lines, +# tab-separated columns). Times out after 15 s; on timeout, prints whatever +# the last attempt returned so the diff vs. reference is informative. +wait_for() { + local expected="$1" + local query="$2" + local deadline=$((SECONDS + 15)) + local actual="" + while [ "$SECONDS" -lt "$deadline" ]; do + actual=$(${CLICKHOUSE_CLIENT} -q "$query" 2>/dev/null) + if [ "$actual" = "$expected" ]; then + echo "$expected" + return 0 + fi + sleep 0.25 + done + echo "$actual" + return 1 +} + +${CLICKHOUSE_CLIENT} -m -q " +DROP NAMED SCALAR IF EXISTS cv_refresh; +DROP NAMED SCALAR IF EXISTS cv_flap; +DROP TABLE IF EXISTS default.cv_flap_src; +" + +# -------- cv_refresh: scheduled refresh fires and the row appears valid -------- +${CLICKHOUSE_CLIENT} -m -q " +CREATE NAMED SCALAR cv_refresh REFRESH EVERY 1 SECOND AS SELECT now(); +SYSTEM REFRESH NAMED SCALAR cv_refresh; +" +wait_for "local cv_refresh 1 1" " +SELECT kind, name, has_value, current_value_is_valid +FROM system.named_scalars +WHERE kind = 'local' AND name = 'cv_refresh' +ORDER BY name +" + +# -------- cv_flap: backing table flaps; last-good preserved across the gap -------- +${CLICKHOUSE_CLIENT} -m -q " +CREATE TABLE default.cv_flap_src (x UInt8) ENGINE = Memory; +INSERT INTO default.cv_flap_src VALUES (7); +CREATE NAMED SCALAR cv_flap REFRESH EVERY 36500 DAYS AS (SELECT count() FROM default.cv_flap_src); +" +${CLICKHOUSE_CLIENT} -q "SELECT getNamedScalar('cv_flap')" + +# Drop the table; force a refresh; the scalar enters a stale-with-error state +# while keeping the previous good value visible. +${CLICKHOUSE_CLIENT} -m -q " +DROP TABLE default.cv_flap_src; +SYSTEM REFRESH NAMED SCALAR cv_flap; +" +wait_for "1 0 1 1" " +SELECT has_value, current_value_is_valid, coalesce(exception, '') != '' AS has_error, + last_success_time > toDateTime('1970-01-01 00:00:00', 'UTC') AS kept_prior_success +FROM system.named_scalars +WHERE kind = 'local' AND name = 'cv_flap' +" +# During the outage, getNamedScalar still serves the last-good value. +${CLICKHOUSE_CLIENT} -q "SELECT getNamedScalar('cv_flap')" + +# Recovery: re-create the table; force a refresh; current_value_is_valid +# returns to 1 and exception is cleared. +${CLICKHOUSE_CLIENT} -m -q " +CREATE TABLE default.cv_flap_src (x UInt8) ENGINE = Memory; +INSERT INTO default.cv_flap_src VALUES (1), (2); +SYSTEM REFRESH NAMED SCALAR cv_flap; +" +wait_for "1 1 1" " +SELECT has_value, current_value_is_valid, coalesce(exception, '') = '' AS cleared_error +FROM system.named_scalars +WHERE kind = 'local' AND name = 'cv_flap' +" +${CLICKHOUSE_CLIENT} -q "SELECT getNamedScalar('cv_flap')" + +${CLICKHOUSE_CLIENT} -m -q " +DROP NAMED SCALAR cv_refresh; +DROP NAMED SCALAR cv_flap; +DROP TABLE IF EXISTS default.cv_flap_src; +" diff --git a/tests/queries/0_stateless/03802_named_scalars_grammar.reference b/tests/queries/0_stateless/03802_named_scalars_grammar.reference new file mode 100644 index 000000000000..fe30df283b79 --- /dev/null +++ b/tests/queries/0_stateless/03802_named_scalars_grammar.reference @@ -0,0 +1,2 @@ +1 +UInt32 diff --git a/tests/queries/0_stateless/03802_named_scalars_grammar.sql b/tests/queries/0_stateless/03802_named_scalars_grammar.sql new file mode 100644 index 000000000000..3ad6bc0357f9 --- /dev/null +++ b/tests/queries/0_stateless/03802_named_scalars_grammar.sql @@ -0,0 +1,47 @@ +-- Tags: no-parallel +SET allow_experimental_named_scalars = 1; + +DROP NAMED SCALAR IF EXISTS cv_kind_local; + +-- ----- SHARED requires Keeper config; without it a clear error fires +-- before any other validation (was 03804 + 03806). +CREATE SHARED NAMED SCALAR foo AS SELECT 1; -- {serverError SHARED_NAMED_SCALARS_NOT_CONFIGURED} +CREATE OR REPLACE SHARED NAMED SCALAR foo AS SELECT toUInt64(5); -- {serverError SHARED_NAMED_SCALARS_NOT_CONFIGURED} +CREATE SHARED NAMED SCALAR IF NOT EXISTS foo REFRESH EVERY 1 MINUTE AS SELECT now(); -- {serverError SHARED_NAMED_SCALARS_NOT_CONFIGURED} + +-- ----- Grammar rejections (was 03804) ----- +DROP SHARED NAMED SCALAR foo; -- {clientError SYNTAX_ERROR} +DROP SHARED NAMED SCALAR IF EXISTS foo; -- {clientError SYNTAX_ERROR} +DROP NAMED SCALAR foo; -- {serverError NAMED_SCALAR_NOT_FOUND} +DROP NAMED SCALAR IF EXISTS foo; + +-- ON CLUSTER + SHARED is rejected by the interpreter (Keeper distributes already). +CREATE SHARED NAMED SCALAR foo ON CLUSTER x AS SELECT 1; -- {serverError SYNTAX_ERROR} +DROP SHARED NAMED SCALAR foo ON CLUSTER x; -- {clientError SYNTAX_ERROR} + +-- TEMPORARY is not supported. +CREATE TEMPORARY NAMED SCALAR t AS SELECT 1; -- {clientError SYNTAX_ERROR} + +-- Bare identifier required for scalar names; no compound names. +CREATE NAMED SCALAR foo.bar AS SELECT 1; -- {clientError SYNTAX_ERROR} +DROP NAMED SCALAR foo.bar; -- {clientError SYNTAX_ERROR} + +-- LOCAL is accepted as an explicit cache-kind modifier. +CREATE LOCAL NAMED SCALAR local_foo AS SELECT 1; +DROP NAMED SCALAR local_foo; + +-- ----- SYSTEM commands for unknown name (was 03806) ----- +SYSTEM REFRESH NAMED SCALAR never_existed; -- {serverError NAMED_SCALAR_NOT_FOUND} +SYSTEM STOP NAMED SCALAR REFRESHES never_existed; -- {serverError NAMED_SCALAR_NOT_FOUND} +SYSTEM START NAMED SCALAR REFRESHES never_existed; -- {serverError NAMED_SCALAR_NOT_FOUND} +SYSTEM REFRESH TEMPORARY NAMED SCALAR anything; -- {clientError SYNTAX_ERROR} + +-- ----- OR REPLACE that changes cache kind is rejected (was 03812) ----- +-- Without Keeper, ensureCreatable fires before the kind-change check, so both +-- BAD_ARGUMENTS and SHARED_NAMED_SCALARS_NOT_CONFIGURED are acceptable here. +-- Integration tests cover the BAD_ARGUMENTS path with Keeper configured. +CREATE LOCAL NAMED SCALAR cv_kind_local AS SELECT toUInt32(1); +CREATE OR REPLACE SHARED NAMED SCALAR cv_kind_local AS SELECT toUInt32(2); -- {serverError BAD_ARGUMENTS,SHARED_NAMED_SCALARS_NOT_CONFIGURED} +SELECT getNamedScalar('cv_kind_local'); +SELECT toTypeName(getNamedScalar('cv_kind_local')); +DROP NAMED SCALAR cv_kind_local; diff --git a/tests/queries/0_stateless/03805_named_scalars_access.reference b/tests/queries/0_stateless/03805_named_scalars_access.reference new file mode 100644 index 000000000000..3331851422ae --- /dev/null +++ b/tests/queries/0_stateless/03805_named_scalars_access.reference @@ -0,0 +1,17 @@ +create_without_grant=DENIED +set_definer_without_create_grant=DENIED +drop_without_grant=DENIED +bare_drop=DENIED +get_without_grant=DENIED +7 +7 +0 +1 +sys_refresh_without_grant=DENIED +system_alias_grants=OK +readonly_blocks_create=DENIED +set_definer_without_grant=DENIED +definer_recorded=OK +value_tier_exception_null=1 +value_tier_leak=NO_OK +operator_tier_sees_full=YES_OK diff --git a/tests/queries/0_stateless/03805_named_scalars_access.sh b/tests/queries/0_stateless/03805_named_scalars_access.sh new file mode 100755 index 000000000000..abe25a2eeae4 --- /dev/null +++ b/tests/queries/0_stateless/03805_named_scalars_access.sh @@ -0,0 +1,125 @@ +#!/usr/bin/env bash +# Tags: no-parallel + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh +CLICKHOUSE_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_named_scalars=1" +CLICKHOUSE_LOCAL="$CLICKHOUSE_LOCAL --allow_experimental_named_scalars=1" + +TEST_DB="${CLICKHOUSE_DATABASE}" +U_CREATE="cv_create_${TEST_DB}" +U_DROP="cv_drop_${TEST_DB}" +U_GET="cv_get_${TEST_DB}" +U_SYS="cv_sys_${TEST_DB}" +U_RO="cv_ro_${TEST_DB}" +U_DEFINER="cv_definer_${TEST_DB}" + +cleanup() { + $CLICKHOUSE_CLIENT --query "DROP NAMED SCALAR IF EXISTS access_cv" + $CLICKHOUSE_CLIENT --query "DROP NAMED SCALAR IF EXISTS access_user_definer" + $CLICKHOUSE_CLIENT --query "DROP USER IF EXISTS ${U_CREATE}, ${U_DROP}, ${U_GET}, ${U_SYS}, ${U_RO}, ${U_DEFINER}" +} +trap cleanup EXIT +cleanup + +$CLICKHOUSE_CLIENT --query "CREATE USER ${U_CREATE}, ${U_DROP}, ${U_GET}, ${U_SYS}, ${U_RO}, ${U_DEFINER}" + +# Seed a scalar under the default (unrestricted) user. REFRESH EVERY is set +# so that SYSTEM REFRESH below reaches the access check rather than failing +# with NOT_REFRESHABLE; the body uses a non-constant expression because +# REFRESH on a constant SELECT is rejected at install time. +$CLICKHOUSE_CLIENT --query "CREATE NAMED SCALAR access_cv REFRESH EVERY 1 HOUR AS SELECT toUInt32(7) + toUInt32(rand() * 0)" + +# --- CREATE_NAMED_SCALAR required for CREATE NAMED SCALAR ------------------------- +$CLICKHOUSE_CLIENT --user ${U_CREATE} --query "CREATE NAMED SCALAR access_user AS SELECT 1" 2>&1 \ + | grep -q ACCESS_DENIED && echo "create_without_grant=DENIED" || echo "create_without_grant=ALLOWED" + +$CLICKHOUSE_CLIENT --user ${U_CREATE} --query "CREATE NAMED SCALAR access_user_definer DEFINER = ${U_DEFINER} AS SELECT toUInt32(9)" 2>&1 \ + | grep -q ACCESS_DENIED && echo "set_definer_without_create_grant=DENIED" || echo "set_definer_without_create_grant=ALLOWED" + +$CLICKHOUSE_CLIENT --query "GRANT CREATE NAMED SCALAR ON *.* TO ${U_CREATE}" +$CLICKHOUSE_CLIENT --user ${U_CREATE} --query "CREATE NAMED SCALAR access_user AS SELECT toUInt32(1)" 2>&1 | head -1 +$CLICKHOUSE_CLIENT --user ${U_CREATE} --query "DROP NAMED SCALAR access_user" 2>&1 \ + | grep -q ACCESS_DENIED && echo "drop_without_grant=DENIED" || echo "drop_without_grant=ALLOWED" +$CLICKHOUSE_CLIENT --query "DROP NAMED SCALAR access_user" + +# --- DROP_NAMED_SCALAR required for DROP NAMED SCALAR ----------------------------- +$CLICKHOUSE_CLIENT --user ${U_DROP} --query "DROP NAMED SCALAR access_cv" 2>&1 \ + | grep -q ACCESS_DENIED && echo "bare_drop=DENIED" || echo "bare_drop=ALLOWED" + +# --- getNamedScalar requires getNamedScalar access ------------------------------ +$CLICKHOUSE_CLIENT --user ${U_GET} --query "SELECT getNamedScalar('access_cv')" 2>&1 \ + | grep -q ACCESS_DENIED && echo "get_without_grant=DENIED" || echo "get_without_grant=ALLOWED" + +$CLICKHOUSE_CLIENT --query "GRANT getNamedScalar ON *.* TO ${U_GET}" +$CLICKHOUSE_CLIENT --user ${U_GET} --query "SELECT getNamedScalar('access_cv')" + +# One getNamedScalar grant covers both read functions. +$CLICKHOUSE_CLIENT --user ${U_GET} --query "SELECT getNamedScalarOrDefault('access_cv', toUInt32(0))" +$CLICKHOUSE_CLIENT --user ${U_GET} --query "SELECT getNamedScalarOrDefault('missing', toUInt32(0))" + +# system.named_scalars is visible to users with either SHOW_NAMED_SCALARS +# or getNamedScalar; asserting the getNamedScalar branch pins that fallback. +$CLICKHOUSE_CLIENT --user ${U_GET} --query \ + "SELECT count() FROM system.named_scalars WHERE kind = 'local' AND name = 'access_cv'" + +# --- SYSTEM REFRESH NAMED SCALAR requires SYSTEM REFRESH NAMED SCALAR ------ +$CLICKHOUSE_CLIENT --user ${U_SYS} --query "SYSTEM REFRESH NAMED SCALAR access_cv" 2>&1 \ + | grep -q ACCESS_DENIED && echo "sys_refresh_without_grant=DENIED" || echo "sys_refresh_without_grant=ALLOWED" + +$CLICKHOUSE_CLIENT --query "GRANT SYSTEM REFRESH NAMED SCALAR ON *.* TO ${U_SYS}" +$CLICKHOUSE_CLIENT --user ${U_SYS} --query "SYSTEM REFRESH NAMED SCALAR access_cv" 2>&1 | head -1 +$CLICKHOUSE_CLIENT --query "GRANT SYSTEM START NAMED SCALAR REFRESHES ON *.* TO ${U_SYS}" +$CLICKHOUSE_CLIENT --query "GRANT SYSTEM STOP NAMED SCALAR REFRESHES ON *.* TO ${U_SYS}" +echo "system_alias_grants=OK" + +# --- Readonly profile blocks CREATE NAMED SCALAR ------------------------------ +$CLICKHOUSE_CLIENT --query "GRANT CREATE NAMED SCALAR ON *.* TO ${U_RO}" +$CLICKHOUSE_CLIENT --user ${U_RO} --readonly 1 --query "CREATE NAMED SCALAR access_ro AS SELECT 1" 2>&1 \ + | grep -qE "ACCESS_DENIED|READONLY" && echo "readonly_blocks_create=DENIED" || echo "readonly_blocks_create=ALLOWED" + +# --- SET DEFINER required for another definer user ------------------------------ +$CLICKHOUSE_CLIENT --user ${U_CREATE} --query "CREATE NAMED SCALAR access_user_definer DEFINER = ${U_DEFINER} AS SELECT toUInt32(9)" 2>&1 \ + | grep -q ACCESS_DENIED && echo "set_definer_without_grant=DENIED" || echo "set_definer_without_grant=ALLOWED" + +$CLICKHOUSE_CLIENT --query "GRANT SET DEFINER ON ${U_DEFINER} TO ${U_CREATE}" +$CLICKHOUSE_CLIENT --user ${U_CREATE} --query "CREATE NAMED SCALAR access_user_definer DEFINER = ${U_DEFINER} AS SELECT toUInt32(9)" +$CLICKHOUSE_CLIENT --query "SELECT definer = '${U_DEFINER}' FROM system.named_scalars WHERE kind = 'local' AND name = 'access_user_definer'" \ + | grep -q 1 && echo "definer_recorded=OK" || echo "definer_recorded=UNEXPECTED" + +# --- last_error tier-gating: the definer's exception text in +# system.named_scalars.exception is operator-tier; readers with +# only getNamedScalar must see NULL, not the definer's error text. +SECRET_TBL="redacted_${CLICKHOUSE_TEST_UNIQUE_NAME:-default}" +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS default.${SECRET_TBL}" +$CLICKHOUSE_CLIENT --query "CREATE TABLE default.${SECRET_TBL} (x UInt8) ENGINE = Memory" +$CLICKHOUSE_CLIENT --query "INSERT INTO default.${SECRET_TBL} VALUES (1)" +$CLICKHOUSE_CLIENT --query "CREATE NAMED SCALAR access_redacted REFRESH EVERY 36500 DAYS AS SELECT count() FROM default.${SECRET_TBL}" +$CLICKHOUSE_CLIENT --query "DROP TABLE default.${SECRET_TBL}" +$CLICKHOUSE_CLIENT --query "SYSTEM REFRESH NAMED SCALAR access_redacted" +# Wait for the failure to land. +for _ in $(seq 1 30); do + last=$($CLICKHOUSE_CLIENT --query "SELECT coalesce(exception, '') FROM system.named_scalars WHERE name = 'access_redacted'") + [ -n "$last" ] && break + sleep 0.1 +done +# Value-tier reader (getNamedScalar only): exception column must be NULL, +# never expose the definer's error text. +$CLICKHOUSE_CLIENT --user ${U_GET} --query \ + "SELECT exception IS NULL FROM system.named_scalars WHERE name='access_redacted'" \ + | tr '\n' ' ' | sed 's/ $//; s/^/value_tier_exception_null=/' +echo +$CLICKHOUSE_CLIENT --user ${U_GET} --query \ + "SELECT exception FROM system.named_scalars WHERE name='access_redacted'" 2>&1 \ + | grep -q "${SECRET_TBL}" && echo "value_tier_leak=YES_BUG" || echo "value_tier_leak=NO_OK" +# Operator-tier reader (SHOW_NAMED_SCALARS): exception is populated. +U_SHOW="u_show_redacted_${CLICKHOUSE_TEST_UNIQUE_NAME:-default}" +$CLICKHOUSE_CLIENT --query "DROP USER IF EXISTS ${U_SHOW}" +$CLICKHOUSE_CLIENT --query "CREATE USER ${U_SHOW} IDENTIFIED WITH no_password" +$CLICKHOUSE_CLIENT --query "GRANT SHOW NAMED SCALARS ON *.* TO ${U_SHOW}" +$CLICKHOUSE_CLIENT --user ${U_SHOW} --query \ + "SELECT exception FROM system.named_scalars WHERE name='access_redacted'" \ + | grep -q "${SECRET_TBL}" && echo "operator_tier_sees_full=YES_OK" || echo "operator_tier_sees_full=NO_BUG" +$CLICKHOUSE_CLIENT --query "DROP USER ${U_SHOW}" +$CLICKHOUSE_CLIENT --query "DROP NAMED SCALAR access_redacted" diff --git a/tests/queries/0_stateless/03808_getNamedScalar_snapshot.reference b/tests/queries/0_stateless/03808_getNamedScalar_snapshot.reference new file mode 100644 index 000000000000..6ed281c757a9 --- /dev/null +++ b/tests/queries/0_stateless/03808_getNamedScalar_snapshot.reference @@ -0,0 +1,2 @@ +1 +1 diff --git a/tests/queries/0_stateless/03808_getNamedScalar_snapshot.sql b/tests/queries/0_stateless/03808_getNamedScalar_snapshot.sql new file mode 100644 index 000000000000..14ec7693096f --- /dev/null +++ b/tests/queries/0_stateless/03808_getNamedScalar_snapshot.sql @@ -0,0 +1,19 @@ +-- Tags: no-parallel +SET allow_experimental_named_scalars = 1; + +-- Query-snapshot semantics: the value resolved at analysis time is stable +-- for the whole query, even across multiple reads. Also exercises the +-- defined-scalar path of getNamedScalarOrDefault. +-- Refresh-fires-after-scheduled-tick is covered by 03801. + +DROP NAMED SCALAR IF EXISTS snap_v; + +CREATE NAMED SCALAR snap_v REFRESH EVERY 1 SECOND AS SELECT toUInt64(now()); + +-- Two reads of the same scalar in one query must return the same value. +SELECT getNamedScalar('snap_v') = getNamedScalar('snap_v'); + +-- Defined scalar path captures value/type before execution. +SELECT getNamedScalarOrDefault('snap_v', toUInt64(0)) > 0; + +DROP NAMED SCALAR snap_v; diff --git a/tests/queries/0_stateless/03809_named_scalars_orreplace_under_refresh.reference b/tests/queries/0_stateless/03809_named_scalars_orreplace_under_refresh.reference new file mode 100644 index 000000000000..bf9ab62e8e57 --- /dev/null +++ b/tests/queries/0_stateless/03809_named_scalars_orreplace_under_refresh.reference @@ -0,0 +1,3 @@ +1 +0 +0 1 1 diff --git a/tests/queries/0_stateless/03809_named_scalars_orreplace_under_refresh.sh b/tests/queries/0_stateless/03809_named_scalars_orreplace_under_refresh.sh new file mode 100755 index 000000000000..ad114eac59a3 --- /dev/null +++ b/tests/queries/0_stateless/03809_named_scalars_orreplace_under_refresh.sh @@ -0,0 +1,90 @@ +#!/usr/bin/env bash +# Tags: no-parallel, long +# +# Round-17 W7 — exercise the OR-REPLACE-vs-running-refresh fence and Slot +# lifecycle. Plan reference: lucky-coalescing-codd.md items B1 + B2. +# +# What this catches: +# - Stale refresh body publishing OLD .bin over a fresh CREATE OR REPLACE +# (epoch fence in NamedScalarSlot::runTask). If the fence is broken, +# the final value can be `i_old < N` rather than `N` because an old +# body's atomic-store ran AFTER the new install. +# - Slot lifecycle leaks (BackgroundNamedScalarRefreshPoolTask not returning to +# 0 after the storm settles, or Slot destruction deadlocking on +# deactivate-self). +# +# Shape: tight CREATE OR REPLACE loop with REFRESH EVERY 1 SECOND. Each +# iteration writes a different SELECT constant. While the loop runs, +# refresh ticks fire on whichever entry is current; if a tick lands +# during an OR REPLACE the fence must discard the old refresh's result. +# After the loop and a settle period, the final value must equal the +# last iteration. + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh +CLICKHOUSE_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_named_scalars=1" +CLICKHOUSE_LOCAL="$CLICKHOUSE_LOCAL --allow_experimental_named_scalars=1" + +NAME="cv_replace_under_refresh_${CLICKHOUSE_TEST_UNIQUE_NAME:-default}" +N=100 + +cleanup() { ${CLICKHOUSE_CLIENT} --query "DROP NAMED SCALAR IF EXISTS ${NAME}"; } +trap cleanup EXIT +cleanup + +# Initial CREATE — refresh wires up immediately. REFRESH EVERY 1 SECOND +# is fast enough that several ticks land during the loop below. The +# `now() < ...` factor produces 1 unconditionally but keeps the +# expression non-constant so the parser accepts REFRESH (constant +# expressions are explicitly rejected for refreshable scalars). +expr_for() { echo "SELECT toUInt64(now() < toDateTime('2200-01-01')) * $1"; } +${CLICKHOUSE_CLIENT} --query "CREATE NAMED SCALAR ${NAME} REFRESH EVERY 1 SECOND AS $(expr_for 0)" + +# Tight loop of OR REPLACE — each iteration installs a new definition +# whose value is the iteration counter. Refresh ticks fire on whichever +# definition is current; the fence must discard any old-body publish +# that lands after the next install. +for i in $(seq 1 ${N}); do + ${CLICKHOUSE_CLIENT} --query "CREATE OR REPLACE NAMED SCALAR ${NAME} REFRESH EVERY 1 SECOND AS $(expr_for ${i})" +done + +# Let any in-flight refresh body finish and any deactivation queue settle. +# 3 seconds covers the 1-second cadence plus the longest plausible eval + +# publish window for these trivial expressions. +${CLICKHOUSE_CLIENT} --query "SELECT sleep(3) FORMAT Null" +${CLICKHOUSE_CLIENT} --query "SELECT sleep(1) FORMAT Null" + +# Assertion 1: final value matches the last OR REPLACE. If the fence +# leaked, an old definition's body could have published its smaller +# constant after the last install; getNamedScalar would then return < N. +${CLICKHOUSE_CLIENT} --query "SELECT getNamedScalar('${NAME}') = toUInt64(${N}) AS final_value_is_last_create" + +# Assertion 2: no orphaned refresh body. The Slot's task holder is +# deactivated synchronously by Runtime::drop / Runtime::shutdown, but a +# leak in the install path (loser slot from a non-linearizable +# installDefinition, say) could keep a ghost task firing. The dedicated +# refresh pool's task gauge — must read 0 after settle. Poll for it +# instead of a single shot to avoid flake when a refresh tick fires +# between the sleep above and this read. +gauge="1" +for _ in $(seq 1 30); do + gauge=$(${CLICKHOUSE_CLIENT} --query "SELECT value FROM system.metrics WHERE metric = 'BackgroundNamedScalarRefreshPoolTask'") + [ "$gauge" = "0" ] && break + sleep 0.5 +done +echo "$gauge" + +# Assertion 3: the slot's bookkeeping isn't stuck mid-flight. Poll for +# refresh_in_flight == 0 to avoid catching a tick that fired between +# the metric poll above and this read. +state="1\t1\t1" +for _ in $(seq 1 30); do + state=$(${CLICKHOUSE_CLIENT} --query " + SELECT refresh_in_flight, has_value, current_value_is_valid + FROM system.named_scalars + WHERE kind = 'local' AND name = '${NAME}'") + [ "$(printf %s "$state" | cut -f1)" = "0" ] && break + sleep 0.5 +done +printf '%s\n' "$state" diff --git a/tests/queries/0_stateless/03810_named_scalars_creator_database.reference b/tests/queries/0_stateless/03810_named_scalars_creator_database.reference new file mode 100644 index 000000000000..661b1440bfb8 --- /dev/null +++ b/tests/queries/0_stateless/03810_named_scalars_creator_database.reference @@ -0,0 +1,2 @@ +60 +definition_is_qualified=1 diff --git a/tests/queries/0_stateless/03810_named_scalars_creator_database.sh b/tests/queries/0_stateless/03810_named_scalars_creator_database.sh new file mode 100755 index 000000000000..da334aa0c5cc --- /dev/null +++ b/tests/queries/0_stateless/03810_named_scalars_creator_database.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +# Tags: no-parallel +# +# Verify CREATE-time normalization rewrites unqualified identifiers with the +# user's current database in the persisted definition. The runtime "refresh +# re-resolves correctly after restart" property is covered by an integration +# test (test_creator_database_normalization in test_shared_named_scalars_cluster). + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh +CLICKHOUSE_LOCAL="$CLICKHOUSE_LOCAL --allow_experimental_named_scalars=1" + +TMP_DIR="${CLICKHOUSE_TMP}/named_scalars_db_${CLICKHOUSE_TEST_UNIQUE_NAME}" +rm -rf "$TMP_DIR" +mkdir -p "$TMP_DIR" + +# CREATE inside `mydb` so the session's currentDatabase is `mydb`. The scalar +# references `t` unqualified; CREATE-time normalization must rewrite it to +# `mydb.t` in the persisted .sql. +${CLICKHOUSE_LOCAL} --path "$TMP_DIR" --multiquery --query " +CREATE DATABASE mydb; +CREATE TABLE mydb.t (x UInt64) ENGINE=MergeTree() ORDER BY x; +INSERT INTO mydb.t VALUES (10), (20), (30); +" 2>&1 + +${CLICKHOUSE_LOCAL} --path "$TMP_DIR" --database=mydb --query " +CREATE NAMED SCALAR cv_db REFRESH EVERY 36500 DAYS AS (SELECT sum(x) FROM t); +" 2>&1 + +# Sync CREATE eval already proves the value resolves to 60. +${CLICKHOUSE_LOCAL} --path "$TMP_DIR" --query "SELECT getNamedScalar('cv_db')" + +# Persisted .sql must carry the qualified identifier. +SQL_FILE="$(ls ${TMP_DIR%/}/named_scalars/named_scalar_cv_db.sql 2>/dev/null | head -1)" +if [ -n "$SQL_FILE" ] && [ -f "$SQL_FILE" ]; then + grep -q 'FROM mydb.t' "$SQL_FILE" && echo "definition_is_qualified=1" || echo "definition_is_qualified=0" +else + echo "sql_file_missing" +fi + +rm -rf "$TMP_DIR" diff --git a/tests/queries/0_stateless/03816_named_scalars_query_log_visibility.reference b/tests/queries/0_stateless/03816_named_scalars_query_log_visibility.reference new file mode 100644 index 000000000000..c34b35c8f73a --- /dev/null +++ b/tests/queries/0_stateless/03816_named_scalars_query_log_visibility.reference @@ -0,0 +1,4 @@ +12345 +1 +saw_refresh_in_processes=1 +cancel_recorded=1 diff --git a/tests/queries/0_stateless/03816_named_scalars_query_log_visibility.sh b/tests/queries/0_stateless/03816_named_scalars_query_log_visibility.sh new file mode 100755 index 000000000000..70cfdf657df2 --- /dev/null +++ b/tests/queries/0_stateless/03816_named_scalars_query_log_visibility.sh @@ -0,0 +1,71 @@ +#!/usr/bin/env bash +# Tags: no-parallel, long +# +# Refresh-body visibility & cancellation. Asserts that: +# 1. A SYSTEM REFRESH NAMED SCALAR shows up in system.query_log with is_internal=1. +# 2. While a slow refresh is in flight, system.processes lists it (KILL QUERY targets it). +# 3. KILL QUERY ... SYNC actually interrupts the refresh and the scalar +# records last_error_type = 'QUERY_WAS_CANCELLED'. + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh +CLICKHOUSE_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_named_scalars=1" + +NAME="cv_visibility_${CLICKHOUSE_TEST_UNIQUE_NAME:-default}" +SLOW_NAME="cv_slow_${CLICKHOUSE_TEST_UNIQUE_NAME:-default}" + +cleanup() { + ${CLICKHOUSE_CLIENT} -q "DROP NAMED SCALAR IF EXISTS ${NAME}" + ${CLICKHOUSE_CLIENT} -q "DROP NAMED SCALAR IF EXISTS ${SLOW_NAME}" +} +trap cleanup EXIT +cleanup + +# --- 1) refresh body lands in system.query_log with is_internal=1 ---------- +${CLICKHOUSE_CLIENT} -q "CREATE NAMED SCALAR ${NAME} REFRESH EVERY 36500 DAYS AS SELECT toUInt64(now() < toDateTime('2200-01-01')) * 12345" +${CLICKHOUSE_CLIENT} -q "SYSTEM REFRESH NAMED SCALAR ${NAME}" +${CLICKHOUSE_CLIENT} -q "SELECT getNamedScalar('${NAME}')" +${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS query_log" +${CLICKHOUSE_CLIENT} -q " + SELECT count() >= 1 AS refresh_in_query_log + FROM system.query_log + WHERE is_internal = 1 + AND type = 'QueryFinish' + AND query LIKE '%12345%' + AND query NOT LIKE '%FROM system.query_log%' +" + +# --- 2 + 3) KILL QUERY interrupts a running refresh ------------------------- +# Block-aligned sleep so KILL QUERY can interrupt between blocks (default +# block size is 65k; sleep fires per block). +${CLICKHOUSE_CLIENT} -q "CREATE NAMED SCALAR ${SLOW_NAME} REFRESH EVERY 36500 DAYS AS SELECT min(number) FROM numbers(10000000) WHERE NOT(ignore(sleep(0.1)))" +# Kick a refresh and immediately try to kill it. The refresh fires asynchronously +# on the background pool; poll until system.processes shows it, then kill. +${CLICKHOUSE_CLIENT} -q "SYSTEM REFRESH NAMED SCALAR ${SLOW_NAME}" + +found="" +for _ in $(seq 1 60); do + found=$(${CLICKHOUSE_CLIENT} -q " + SELECT count() FROM system.processes + WHERE query LIKE '%numbers(10000000)%' AND is_initial_query = 0 + ") + [ "$found" = "1" ] && break + sleep 0.1 +done +echo "saw_refresh_in_processes=${found}" + +${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE query LIKE '%numbers(10000000)%' SYNC FORMAT Null" || true + +# Wait for the named-scalar refresh task to record the cancellation. +cancel_recorded="0" +for _ in $(seq 1 80); do + cancel_recorded=$(${CLICKHOUSE_CLIENT} -q " + SELECT coalesce(exception, '') LIKE '%QUERY_WAS_CANCELLED%' + FROM system.named_scalars + WHERE name = '${SLOW_NAME}' + ") + [ "$cancel_recorded" = "1" ] && break + sleep 0.1 +done +echo "cancel_recorded=${cancel_recorded}"