Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4988,25 +4988,32 @@ void Context::startClusterDiscovery()
/// On repeating calls updates existing clusters and adds new clusters, doesn't delete old clusters
void Context::setClustersConfig(const ConfigurationPtr & config, bool enable_discovery, const String & config_name)
{
std::lock_guard lock(shared->clusters_mutex);
if (ConfigHelper::getBool(*config, "allow_experimental_cluster_discovery") && enable_discovery && !shared->cluster_discovery)
{
shared->cluster_discovery = std::make_unique<ClusterDiscovery>(*config, getGlobalContext(), getMacros());
}
std::lock_guard lock(shared->clusters_mutex);
if (ConfigHelper::getBool(*config, "allow_experimental_cluster_discovery") && enable_discovery && !shared->cluster_discovery)
{
shared->cluster_discovery = std::make_unique<ClusterDiscovery>(*config, getGlobalContext(), getMacros());
}

/// Do not update clusters if this part of config wasn't changed.
if (shared->clusters && isSameConfiguration(*config, *shared->clusters_config, config_name))
return;
/// Do not update clusters if this part of config wasn't changed.
if (shared->clusters && isSameConfiguration(*config, *shared->clusters_config, config_name))
return;

auto old_clusters_config = shared->clusters_config;
shared->clusters_config = config;
auto old_clusters_config = shared->clusters_config;
shared->clusters_config = config;

if (!shared->clusters)
shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, *settings, getMacros(), config_name);
else
shared->clusters->updateClusters(*shared->clusters_config, *settings, config_name, old_clusters_config);
if (!shared->clusters)
shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, *settings, getMacros(), config_name);
else
shared->clusters->updateClusters(*shared->clusters_config, *settings, config_name, old_clusters_config);

++shared->clusters_version;
++shared->clusters_version;
}
{
SharedLockGuard lock(shared->mutex);
if (shared->ddl_worker)
shared->ddl_worker->notifyHostIDsUpdated();
}
}

size_t Context::getClustersVersion() const
Expand Down
148 changes: 110 additions & 38 deletions src/Interpreters/DDLWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
#include <Core/ServerSettings.h>
#include <Core/ServerUUID.h>
#include <Core/Settings.h>
#include <Databases/DatabaseReplicated.h>
#include <IO/NullWriteBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/ZooKeeperLog.h>
Expand Down Expand Up @@ -188,6 +188,26 @@ ZooKeeperPtr DDLWorker::getAndSetZooKeeper()
return current_zookeeper;
}

void DDLWorker::notifyHostIDsUpdated()
{
LOG_INFO(log, "Host IDs updated");
host_ids_updated = true;
}

void DDLWorker::updateHostIDs(const std::vector<HostID> & hosts)
{
std::lock_guard lock{checked_host_id_set_mutex};
for (const auto & host : hosts)
{
if (!checked_host_id_set.contains(host.toString()))
{
LOG_INFO(log, "Found new host ID: {}", host.toString());
notifyHostIDsUpdated();
return;
}
}
}


DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper, bool /*dry_run*/)
{
Expand Down Expand Up @@ -223,6 +243,7 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r
{
/// Stage 1: parse entry
task->entry.parse(node_data);
updateHostIDs(task->entry.hosts);
}
catch (...)
{
Expand Down Expand Up @@ -1139,7 +1160,7 @@ bool DDLWorker::initializeMainThread()
auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(fs::path(queue_dir) / "");
initializeReplication();
markReplicasActive(true);
markReplicasActive(/*reinitialized=*/true);
initialized = true;
return true;
}
Expand Down Expand Up @@ -1194,7 +1215,7 @@ void DDLWorker::runMainThread()


setThreadName("DDLWorker");
LOG_DEBUG(log, "Starting DDLWorker thread");
LOG_INFO(log, "Starting DDLWorker thread");

while (!stop_flag)
{
Expand All @@ -1203,14 +1224,18 @@ void DDLWorker::runMainThread()
bool reinitialized = !initialized;

/// Reinitialize DDLWorker state (including ZooKeeper connection) if required
if (!initialized)
if (reinitialized)
{
/// Stopped
if (!initializeMainThread())
break;

LOG_DEBUG(log, "Initialized DDLWorker thread");
}

if (host_ids_updated.exchange(false))
markReplicasActive(/*reinitialized=*/false);

cleanup_event->set();
scheduleTasks(reinitialized);
subsequent_errors_count = 0;
Expand Down Expand Up @@ -1269,61 +1294,104 @@ void DDLWorker::runMainThread()
void DDLWorker::initializeReplication()
{
auto zookeeper = getZooKeeper();

zookeeper->createAncestors(fs::path(replicas_dir) / "");

NameSet host_id_set;
for (const auto & it : context->getClusters())
{
auto cluster = it.second;
for (const auto & host_ids : cluster->getHostIDs())
for (const auto & host_id : host_ids)
host_id_set.emplace(host_id);
}

createReplicaDirs(zookeeper, host_id_set);
}

void DDLWorker::createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids)
{
for (const auto & host_id : host_ids)
{
LOG_INFO(log, "Creating replica dir for host id {}", host_id);
zookeeper->createAncestors(fs::path(replicas_dir) / host_id / "");
}
}

void DDLWorker::markReplicasActive(bool /*reinitialized*/)
void DDLWorker::markReplicasActive(bool reinitialized)
{
auto zookeeper = getZooKeeper();
const auto maybe_secure_port = context->getTCPPortSecure();
const auto port = context->getTCPPort();

auto all_host_ids = getAllHostIDsFromClusters();

// Reset all active_node_holders
for (auto & it : active_node_holders)
// Add interserver IO host IDs for Replicated DBs
try
{
auto host_port = context->getInterserverIOAddress();
HostID interserver_io_host_id = {host_port.first, port};
all_host_ids.emplace(interserver_io_host_id.toString());
LOG_INFO(log, "Add interserver IO host ID {}", interserver_io_host_id.toString());
if (maybe_secure_port)
{
HostID interserver_io_secure_host_id = {host_port.first, *maybe_secure_port};
all_host_ids.emplace(interserver_io_secure_host_id.toString());
LOG_INFO(log, "Add interserver IO secure host ID {}", interserver_io_secure_host_id.toString());
}
}
catch (const Exception & e)
{
auto & active_node_holder = it.second.second;
if (active_node_holder)
active_node_holder->setAlreadyRemoved();
active_node_holder.reset();
LOG_INFO(log, "Unable to get interserver IO address, error {}", e.what());
}

active_node_holders.clear();
createReplicaDirs(zookeeper, all_host_ids);

if (reinitialized)
{
// Reset all active_node_holders
for (auto & it : active_node_holders)
{
auto & active_node_holder = it.second.second;
if (active_node_holder)
active_node_holder->setAlreadyRemoved();
active_node_holder.reset();
}
active_node_holders.clear();
}

const auto maybe_secure_port = context->getTCPPortSecure();
const auto port = context->getTCPPort();

Coordination::Stat replicas_stat;
Strings host_ids = zookeeper->getChildren(replicas_dir, &replicas_stat);
NameSet local_host_ids;
NameSet checking_host_ids;
checking_host_ids.reserve(host_ids.size());
for (const auto & host_id : host_ids)
{
bool is_self_host = false;
try
{
HostID host = HostID::fromString(host_id);
if (DDLTask::isSelfHostID(log, host, maybe_secure_port, port))
local_host_ids.emplace(host_id);
checking_host_ids.insert(host.toString());

is_self_host = DDLTask::isSelfHostID(log, host, maybe_secure_port, port);
}
catch (const Exception & e)
{
LOG_WARNING(log, "Unable to check if host {} is a local address, exception: {}", host_id, e.displayText());
continue;
}

LOG_INFO(log, "Self host_id ({}) = {}", host_id, is_self_host);
if (is_self_host)
{
local_host_ids.emplace(host_id);
continue;
}

if (!reinitialized)
{
/// Remove this host_id from active_node_holders
auto it = active_node_holders.find(host_id);
if (it != active_node_holders.end())
{
auto & active_node_holder = it->second.second;
if (active_node_holder)
active_node_holder->setAlreadyRemoved();
active_node_holder.reset();

active_node_holders.erase(it);
}
continue;
}
}

for (const auto & host_id : local_host_ids)
Expand Down Expand Up @@ -1371,17 +1439,9 @@ void DDLWorker::markReplicasActive(bool /*reinitialized*/)
active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder};
}

if (active_node_holders.empty())
{
for (const auto & it : context->getClusters())
{
const auto & cluster = it.second;
if (!cluster->getHostIDs().empty())
{
LOG_WARNING(log, "There are clusters with host ids but no local host found for this replica.");
break;
}
}
std::lock_guard lock{checked_host_id_set_mutex};
checked_host_id_set = checking_host_ids;
}
}

Expand Down Expand Up @@ -1448,4 +1508,16 @@ void DDLWorker::runCleanupThread()
}
}

NameSet DDLWorker::getAllHostIDsFromClusters() const
{
NameSet host_id_set;
for (const auto & it : context->getClusters())
{
auto cluster = it.second;
for (const auto & host_ids : cluster->getHostIDs())
for (const auto & host_id : host_ids)
host_id_set.emplace(host_id);
}
return host_id_set;
}
}
16 changes: 14 additions & 2 deletions src/Interpreters/DDLWorker.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
#pragma once

#include <Core/Names.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/DDLTask.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Poco/Event.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Interpreters/Context_fwd.h>
#include <Poco/Event.h>

#include <atomic>
#include <list>
Expand Down Expand Up @@ -94,6 +96,9 @@ class DDLWorker
/// Should be called in `initializeMainThread` only, so if it is expired, `runMainThread` will reinitialized the state.
ZooKeeperPtr getAndSetZooKeeper();

void notifyHostIDsUpdated();
void updateHostIDs(const std::vector<HostID> & hosts);

protected:

class ConcurrentSet
Expand Down Expand Up @@ -173,6 +178,8 @@ class DDLWorker
void runMainThread();
void runCleanupThread();

NameSet getAllHostIDsFromClusters() const;

ContextMutablePtr context;
LoggerPtr log;

Expand Down Expand Up @@ -209,6 +216,7 @@ class DDLWorker

/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
Int64 cleanup_delay_period = 60; // minute (in seconds)
std::atomic_bool host_ids_updated{false};
/// Delete node if its age is greater than that
Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds)
/// How many tasks could be in the queue
Expand All @@ -221,6 +229,10 @@ class DDLWorker
std::atomic_uint64_t subsequent_errors_count = 0;
String last_unexpected_error;

mutable std::mutex checked_host_id_set_mutex;
NameSet checked_host_id_set TSA_GUARDED_BY(checked_host_id_set_mutex);


const CurrentMetrics::Metric * max_entry_metric;
const CurrentMetrics::Metric * max_pushed_entry_metric;

Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/DistributedQueryStatusSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ NameSet DistributedQueryStatusSource::getOfflineHosts(const NameSet & hosts_to_w
if (offline.size() == hosts_to_wait.size())
{
/// Avoid reporting that all hosts are offline
LOG_WARNING(log, "Did not find active hosts, will wait for all {} hosts. This should not happen often", offline.size());
LOG_WARNING(
log, "Did not find active hosts, will wait for all hosts: {}. This should not happen often", fmt::join(hosts_to_wait, ", "));
return {};
}

Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/executeDDLQueryOnCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
entry.setSettingsIfRequired(context);
entry.tracing_context = OpenTelemetry::CurrentContext();
entry.initial_query_id = context->getClientInfo().initial_query_id;
ddl_worker.updateHostIDs(entry.hosts);
String node_path = ddl_worker.enqueueQuery(entry, params.retries_info);

return getDDLOnClusterStatus(node_path, ddl_worker.getReplicasDir(), entry, context);
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<clickhouse>
<remote_servers>
</remote_servers>
</clickhouse>
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<clickhouse>
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
<max_tasks_in_queue>10</max_tasks_in_queue>
<task_max_lifetime>3600</task_max_lifetime>
<cleanup_delay_period>5</cleanup_delay_period>
</distributed_ddl>
</clickhouse>
Loading
Loading