From 91627b0ca44121f990ff22de578fa70de76d4d26 Mon Sep 17 00:00:00 2001 From: Tuan Pham Anh Date: Thu, 22 Jan 2026 06:30:57 +0000 Subject: [PATCH] Merge pull request #92339 from tuanpach/ddl-worker-mark-replicas-active-on-new-host-ids Check and mark the interserver IO address active in DDL worker --- src/Interpreters/Context.cpp | 35 +++-- src/Interpreters/DDLWorker.cpp | 148 +++++++++++++----- src/Interpreters/DDLWorker.h | 16 +- .../DistributedQueryStatusSource.cpp | 3 +- src/Interpreters/executeDDLQueryOnCluster.cpp | 1 + .../__init__.py | 0 .../configs/config.d/remote_servers.xml | 4 + .../configs/config.d/settings.xml | 8 + .../configs/users.d/query_log.xml | 9 ++ .../test.py | 76 +++++++++ 10 files changed, 245 insertions(+), 55 deletions(-) create mode 100644 tests/integration/test_distributed_ddl_on_database_cluster/__init__.py create mode 100644 tests/integration/test_distributed_ddl_on_database_cluster/configs/config.d/remote_servers.xml create mode 100644 tests/integration/test_distributed_ddl_on_database_cluster/configs/config.d/settings.xml create mode 100644 tests/integration/test_distributed_ddl_on_database_cluster/configs/users.d/query_log.xml create mode 100755 tests/integration/test_distributed_ddl_on_database_cluster/test.py diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 72b8eb049a15..b554e36fe257 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -4988,25 +4988,32 @@ void Context::startClusterDiscovery() /// On repeating calls updates existing clusters and adds new clusters, doesn't delete old clusters void Context::setClustersConfig(const ConfigurationPtr & config, bool enable_discovery, const String & config_name) { - std::lock_guard lock(shared->clusters_mutex); - if (ConfigHelper::getBool(*config, "allow_experimental_cluster_discovery") && enable_discovery && !shared->cluster_discovery) { - shared->cluster_discovery = std::make_unique(*config, getGlobalContext(), getMacros()); - } + std::lock_guard lock(shared->clusters_mutex); + if (ConfigHelper::getBool(*config, "allow_experimental_cluster_discovery") && enable_discovery && !shared->cluster_discovery) + { + shared->cluster_discovery = std::make_unique(*config, getGlobalContext(), getMacros()); + } - /// Do not update clusters if this part of config wasn't changed. - if (shared->clusters && isSameConfiguration(*config, *shared->clusters_config, config_name)) - return; + /// Do not update clusters if this part of config wasn't changed. + if (shared->clusters && isSameConfiguration(*config, *shared->clusters_config, config_name)) + return; - auto old_clusters_config = shared->clusters_config; - shared->clusters_config = config; + auto old_clusters_config = shared->clusters_config; + shared->clusters_config = config; - if (!shared->clusters) - shared->clusters = std::make_shared(*shared->clusters_config, *settings, getMacros(), config_name); - else - shared->clusters->updateClusters(*shared->clusters_config, *settings, config_name, old_clusters_config); + if (!shared->clusters) + shared->clusters = std::make_shared(*shared->clusters_config, *settings, getMacros(), config_name); + else + shared->clusters->updateClusters(*shared->clusters_config, *settings, config_name, old_clusters_config); - ++shared->clusters_version; + ++shared->clusters_version; + } + { + SharedLockGuard lock(shared->mutex); + if (shared->ddl_worker) + shared->ddl_worker->notifyHostIDsUpdated(); + } } size_t Context::getClustersVersion() const diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index f4b41bc80c58..a3f456faca7c 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -2,13 +2,13 @@ #include #include #include +#include #include #include #include #include #include #include -#include #include #include #include @@ -188,6 +188,26 @@ ZooKeeperPtr DDLWorker::getAndSetZooKeeper() return current_zookeeper; } +void DDLWorker::notifyHostIDsUpdated() +{ + LOG_INFO(log, "Host IDs updated"); + host_ids_updated = true; +} + +void DDLWorker::updateHostIDs(const std::vector & hosts) +{ + std::lock_guard lock{checked_host_id_set_mutex}; + for (const auto & host : hosts) + { + if (!checked_host_id_set.contains(host.toString())) + { + LOG_INFO(log, "Found new host ID: {}", host.toString()); + notifyHostIDsUpdated(); + return; + } + } +} + DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper, bool /*dry_run*/) { @@ -223,6 +243,7 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r { /// Stage 1: parse entry task->entry.parse(node_data); + updateHostIDs(task->entry.hosts); } catch (...) { @@ -1139,7 +1160,7 @@ bool DDLWorker::initializeMainThread() auto zookeeper = getAndSetZooKeeper(); zookeeper->createAncestors(fs::path(queue_dir) / ""); initializeReplication(); - markReplicasActive(true); + markReplicasActive(/*reinitialized=*/true); initialized = true; return true; } @@ -1194,7 +1215,7 @@ void DDLWorker::runMainThread() setThreadName("DDLWorker"); - LOG_DEBUG(log, "Starting DDLWorker thread"); + LOG_INFO(log, "Starting DDLWorker thread"); while (!stop_flag) { @@ -1203,14 +1224,18 @@ void DDLWorker::runMainThread() bool reinitialized = !initialized; /// Reinitialize DDLWorker state (including ZooKeeper connection) if required - if (!initialized) + if (reinitialized) { /// Stopped if (!initializeMainThread()) break; + LOG_DEBUG(log, "Initialized DDLWorker thread"); } + if (host_ids_updated.exchange(false)) + markReplicasActive(/*reinitialized=*/false); + cleanup_event->set(); scheduleTasks(reinitialized); subsequent_errors_count = 0; @@ -1269,61 +1294,104 @@ void DDLWorker::runMainThread() void DDLWorker::initializeReplication() { auto zookeeper = getZooKeeper(); - zookeeper->createAncestors(fs::path(replicas_dir) / ""); - - NameSet host_id_set; - for (const auto & it : context->getClusters()) - { - auto cluster = it.second; - for (const auto & host_ids : cluster->getHostIDs()) - for (const auto & host_id : host_ids) - host_id_set.emplace(host_id); - } - - createReplicaDirs(zookeeper, host_id_set); } void DDLWorker::createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids) { for (const auto & host_id : host_ids) + { + LOG_INFO(log, "Creating replica dir for host id {}", host_id); zookeeper->createAncestors(fs::path(replicas_dir) / host_id / ""); + } } -void DDLWorker::markReplicasActive(bool /*reinitialized*/) +void DDLWorker::markReplicasActive(bool reinitialized) { auto zookeeper = getZooKeeper(); + const auto maybe_secure_port = context->getTCPPortSecure(); + const auto port = context->getTCPPort(); + + auto all_host_ids = getAllHostIDsFromClusters(); - // Reset all active_node_holders - for (auto & it : active_node_holders) + // Add interserver IO host IDs for Replicated DBs + try + { + auto host_port = context->getInterserverIOAddress(); + HostID interserver_io_host_id = {host_port.first, port}; + all_host_ids.emplace(interserver_io_host_id.toString()); + LOG_INFO(log, "Add interserver IO host ID {}", interserver_io_host_id.toString()); + if (maybe_secure_port) + { + HostID interserver_io_secure_host_id = {host_port.first, *maybe_secure_port}; + all_host_ids.emplace(interserver_io_secure_host_id.toString()); + LOG_INFO(log, "Add interserver IO secure host ID {}", interserver_io_secure_host_id.toString()); + } + } + catch (const Exception & e) { - auto & active_node_holder = it.second.second; - if (active_node_holder) - active_node_holder->setAlreadyRemoved(); - active_node_holder.reset(); + LOG_INFO(log, "Unable to get interserver IO address, error {}", e.what()); } - active_node_holders.clear(); + createReplicaDirs(zookeeper, all_host_ids); + + if (reinitialized) + { + // Reset all active_node_holders + for (auto & it : active_node_holders) + { + auto & active_node_holder = it.second.second; + if (active_node_holder) + active_node_holder->setAlreadyRemoved(); + active_node_holder.reset(); + } + active_node_holders.clear(); + } - const auto maybe_secure_port = context->getTCPPortSecure(); - const auto port = context->getTCPPort(); Coordination::Stat replicas_stat; Strings host_ids = zookeeper->getChildren(replicas_dir, &replicas_stat); NameSet local_host_ids; + NameSet checking_host_ids; + checking_host_ids.reserve(host_ids.size()); for (const auto & host_id : host_ids) { + bool is_self_host = false; try { HostID host = HostID::fromString(host_id); - if (DDLTask::isSelfHostID(log, host, maybe_secure_port, port)) - local_host_ids.emplace(host_id); + checking_host_ids.insert(host.toString()); + + is_self_host = DDLTask::isSelfHostID(log, host, maybe_secure_port, port); } catch (const Exception & e) { LOG_WARNING(log, "Unable to check if host {} is a local address, exception: {}", host_id, e.displayText()); continue; } + + LOG_INFO(log, "Self host_id ({}) = {}", host_id, is_self_host); + if (is_self_host) + { + local_host_ids.emplace(host_id); + continue; + } + + if (!reinitialized) + { + /// Remove this host_id from active_node_holders + auto it = active_node_holders.find(host_id); + if (it != active_node_holders.end()) + { + auto & active_node_holder = it->second.second; + if (active_node_holder) + active_node_holder->setAlreadyRemoved(); + active_node_holder.reset(); + + active_node_holders.erase(it); + } + continue; + } } for (const auto & host_id : local_host_ids) @@ -1371,17 +1439,9 @@ void DDLWorker::markReplicasActive(bool /*reinitialized*/) active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder}; } - if (active_node_holders.empty()) { - for (const auto & it : context->getClusters()) - { - const auto & cluster = it.second; - if (!cluster->getHostIDs().empty()) - { - LOG_WARNING(log, "There are clusters with host ids but no local host found for this replica."); - break; - } - } + std::lock_guard lock{checked_host_id_set_mutex}; + checked_host_id_set = checking_host_ids; } } @@ -1448,4 +1508,16 @@ void DDLWorker::runCleanupThread() } } +NameSet DDLWorker::getAllHostIDsFromClusters() const +{ + NameSet host_id_set; + for (const auto & it : context->getClusters()) + { + auto cluster = it.second; + for (const auto & host_ids : cluster->getHostIDs()) + for (const auto & host_id : host_ids) + host_id_set.emplace(host_id); + } + return host_id_set; +} } diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index abf0a9f84098..46289ee92f89 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -1,15 +1,17 @@ #pragma once +#include +#include +#include #include #include +#include #include #include #include #include #include #include -#include -#include #include #include @@ -94,6 +96,9 @@ class DDLWorker /// Should be called in `initializeMainThread` only, so if it is expired, `runMainThread` will reinitialized the state. ZooKeeperPtr getAndSetZooKeeper(); + void notifyHostIDsUpdated(); + void updateHostIDs(const std::vector & hosts); + protected: class ConcurrentSet @@ -173,6 +178,8 @@ class DDLWorker void runMainThread(); void runCleanupThread(); + NameSet getAllHostIDsFromClusters() const; + ContextMutablePtr context; LoggerPtr log; @@ -209,6 +216,7 @@ class DDLWorker /// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago Int64 cleanup_delay_period = 60; // minute (in seconds) + std::atomic_bool host_ids_updated{false}; /// Delete node if its age is greater than that Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds) /// How many tasks could be in the queue @@ -221,6 +229,10 @@ class DDLWorker std::atomic_uint64_t subsequent_errors_count = 0; String last_unexpected_error; + mutable std::mutex checked_host_id_set_mutex; + NameSet checked_host_id_set TSA_GUARDED_BY(checked_host_id_set_mutex); + + const CurrentMetrics::Metric * max_entry_metric; const CurrentMetrics::Metric * max_pushed_entry_metric; diff --git a/src/Interpreters/DistributedQueryStatusSource.cpp b/src/Interpreters/DistributedQueryStatusSource.cpp index 59a82959d0d0..ac1f8fa19604 100644 --- a/src/Interpreters/DistributedQueryStatusSource.cpp +++ b/src/Interpreters/DistributedQueryStatusSource.cpp @@ -102,7 +102,8 @@ NameSet DistributedQueryStatusSource::getOfflineHosts(const NameSet & hosts_to_w if (offline.size() == hosts_to_wait.size()) { /// Avoid reporting that all hosts are offline - LOG_WARNING(log, "Did not find active hosts, will wait for all {} hosts. This should not happen often", offline.size()); + LOG_WARNING( + log, "Did not find active hosts, will wait for all hosts: {}. This should not happen often", fmt::join(hosts_to_wait, ", ")); return {}; } diff --git a/src/Interpreters/executeDDLQueryOnCluster.cpp b/src/Interpreters/executeDDLQueryOnCluster.cpp index 46ef1aaafee3..9204b541ef35 100644 --- a/src/Interpreters/executeDDLQueryOnCluster.cpp +++ b/src/Interpreters/executeDDLQueryOnCluster.cpp @@ -188,6 +188,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context, entry.setSettingsIfRequired(context); entry.tracing_context = OpenTelemetry::CurrentContext(); entry.initial_query_id = context->getClientInfo().initial_query_id; + ddl_worker.updateHostIDs(entry.hosts); String node_path = ddl_worker.enqueueQuery(entry, params.retries_info); return getDDLOnClusterStatus(node_path, ddl_worker.getReplicasDir(), entry, context); diff --git a/tests/integration/test_distributed_ddl_on_database_cluster/__init__.py b/tests/integration/test_distributed_ddl_on_database_cluster/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_distributed_ddl_on_database_cluster/configs/config.d/remote_servers.xml b/tests/integration/test_distributed_ddl_on_database_cluster/configs/config.d/remote_servers.xml new file mode 100644 index 000000000000..d6a932c56c3c --- /dev/null +++ b/tests/integration/test_distributed_ddl_on_database_cluster/configs/config.d/remote_servers.xml @@ -0,0 +1,4 @@ + + + + diff --git a/tests/integration/test_distributed_ddl_on_database_cluster/configs/config.d/settings.xml b/tests/integration/test_distributed_ddl_on_database_cluster/configs/config.d/settings.xml new file mode 100644 index 000000000000..02708c22026a --- /dev/null +++ b/tests/integration/test_distributed_ddl_on_database_cluster/configs/config.d/settings.xml @@ -0,0 +1,8 @@ + + + /clickhouse/task_queue/ddl + 10 + 3600 + 5 + + \ No newline at end of file diff --git a/tests/integration/test_distributed_ddl_on_database_cluster/configs/users.d/query_log.xml b/tests/integration/test_distributed_ddl_on_database_cluster/configs/users.d/query_log.xml new file mode 100644 index 000000000000..ef8abbd91741 --- /dev/null +++ b/tests/integration/test_distributed_ddl_on_database_cluster/configs/users.d/query_log.xml @@ -0,0 +1,9 @@ + + + + + 1 + 1 + + + diff --git a/tests/integration/test_distributed_ddl_on_database_cluster/test.py b/tests/integration/test_distributed_ddl_on_database_cluster/test.py new file mode 100755 index 000000000000..c54170a56e23 --- /dev/null +++ b/tests/integration/test_distributed_ddl_on_database_cluster/test.py @@ -0,0 +1,76 @@ +import os +import sys +import time +import uuid +import pytest + +from helpers.cluster import ClickHouseCluster +from helpers.test_tools import assert_eq_with_retry + +from helpers.test_tools import TSV + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance( + "node1", + main_configs=["configs/config.d/settings.xml"], + user_configs=["configs/users.d/query_log.xml"], + with_zookeeper=True, + macros={"shard": 1, "replica": 1}, +) +node2 = cluster.add_instance( + "node2", + main_configs=["configs/config.d/settings.xml"], + user_configs=["configs/users.d/query_log.xml"], + with_zookeeper=True, + macros={"shard": 1, "replica": 2}, +) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + yield cluster + + finally: + cluster.shutdown() + + +def test_waiting_replicated_database_hosts(started_cluster): + node1.query("DROP DATABASE IF EXISTS db SYNC") + node2.query("DROP DATABASE IF EXISTS db SYNC") + + node1.query("DROP TABLE IF EXISTS t SYNC") + node2.query("DROP TABLE IF EXISTS t SYNC") + + node1.query( + "CREATE DATABASE db ENGINE=Replicated('/test/db', '{shard}', '{replica}')" + ) + node2.query( + "CREATE DATABASE db ENGINE=Replicated('/test/db', '{shard}', '{replica}')" + ) + + query_id = str(uuid.uuid4()) + node1.query( + "CREATE TABLE t ON CLUSTER 'db' (x INT, y INT) ENGINE=MergeTree ORDER BY x", + settings={"distributed_ddl_output_mode": "throw_only_active"}, + query_id=query_id, + ) + assert ( + node2.query("SELECT count() FROM system.tables WHERE name='t'").strip() == "1" + ) + node1.query("SYSTEM FLUSH LOGS") + assert ( + node1.query( + f"SELECT count() FROM system.text_log WHERE query_id='{query_id}' AND level='Warning' AND message LIKE '%Did not find active hosts%'" + ).strip() + == "0" + ) + + node1.query("DROP DATABASE IF EXISTS db SYNC") + node2.query("DROP DATABASE IF EXISTS db SYNC") + + node1.query("DROP TABLE IF EXISTS t SYNC") + node2.query("DROP TABLE IF EXISTS t SYNC")