Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
M(MergeParts, "Number of source parts participating in current background merges") \
M(Move, "Number of currently executing moves") \
M(Export, "Number of currently executing exports") \
M(ExportPartitionLockWaitingReaders, "Number of threads currently waiting to acquire the export partition in-memory state lock for reading (shared).") \
M(ExportPartitionLockWaitingWriters, "Number of threads currently waiting to acquire the export partition in-memory state lock for writing (exclusive).") \
M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \
M(ReplicatedFetch, "Number of data parts being fetched from replica") \
M(ReplicatedSend, "Number of data parts being sent to replicas") \
Expand Down
4 changes: 4 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@
M(ExportPartitionZooKeeperMulti, "Number of 'multi' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartitionZooKeeperExists, "Number of 'exists' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \
M(ExportPartsRejectedByMemoryLimit, "Number of background export part tasks rejected due to background memory limit.", ValueType::Number) \
M(ExportPartitionLockReadWaitMicroseconds, "Total time spent waiting to acquire the export partition in-memory state lock for reading (shared).", ValueType::Microseconds) \
M(ExportPartitionLockWriteWaitMicroseconds, "Total time spent waiting to acquire the export partition in-memory state lock for writing (exclusive).", ValueType::Microseconds) \
M(ExportPartitionLockReadAcquisitions, "Number of times the export partition in-memory state lock was acquired for reading (shared).", ValueType::Number) \
M(ExportPartitionLockWriteAcquisitions, "Number of times the export partition in-memory state lock was acquired for writing (exclusive).", ValueType::Number) \
\
M(DistributedConnectionTries, "Total count of distributed connection attempts.", ValueType::Number) \
M(DistributedConnectionUsable, "Total count of successful distributed connections to a usable server (with required table, but maybe stale).", ValueType::Number) \
Expand Down
177 changes: 107 additions & 70 deletions src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ class ExportPartitionManifestUpdatingTask

std::mutex status_changes_mutex;
std::queue<std::string> status_changes;

/// Serializes the full bodies of poll() and handleStatusChanges() against each other.
/// Held across ZooKeeper I/O so those two tasks never overlap; the mirror lock
/// (StorageReplicatedMergeTree::export_merge_tree_partition_mutex) is then taken only
/// briefly under this, for the in-memory container mutations. This is what lets the
/// system.replicated_partition_exports reader (which takes the mirror lock shared and
/// briefly) avoid waiting behind slow-network ZooKeeper round-trips.
/// Lock ordering: this -> export_merge_tree_partition_mutex -> export_manifests_mutex.
std::mutex background_task_serialization_mutex;
};

}
47 changes: 33 additions & 14 deletions src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,31 +77,51 @@ void ExportPartitionTaskScheduler::run()
const uint32_t seed = uint32_t(std::hash<std::string>{}(storage.replica_name)) ^ uint32_t(scheduled_exports_count);
pcg64_fast rng(seed);

std::lock_guard lock(storage.export_merge_tree_partition_mutex);
/// Snapshot the PENDING entries under a brief shared lock, then perform all ZooKeeper
/// work and exportPartToTable below WITHOUT holding the lock. The scheduler is a pure
/// reader of the in-memory mirror - it no longer writes entry.status (the status
/// converges via the status watch -> handleStatusChanges and poll()), so a shared lock
/// is sufficient and the system.replicated_partition_exports reader is never blocked by
/// the scheduler.
struct PendingEntrySnapshot
{
std::string key;
ExportReplicatedMergeTreePartitionManifest manifest;
};

std::vector<PendingEntrySnapshot> pending_entries;
{
auto lock = ExportPartitionUtils::lockShared(storage.export_merge_tree_partition_mutex);

// Iterate sorted by create_time
for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_create_time)
{
/// No need to query zk for status if the local one is not PENDING
if (entry.status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING)
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping... Local status is {}", magic_enum::enum_name(entry.status).data());
continue;
}

pending_entries.push_back(PendingEntrySnapshot{entry.getCompositeKey(), entry.manifest});
}
}

auto zk = storage.getZooKeeper();

// Iterate sorted by create_time
for (auto & entry : storage.export_merge_tree_partition_task_entries_by_create_time)
for (const auto & pending : pending_entries)
{
if (scheduled_exports_count >= available_move_executors)
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Scheduled exports count is greater than available move executors, skipping");
break;
}

const auto & manifest = entry.manifest;
const auto key = entry.getCompositeKey();
const auto & manifest = pending.manifest;
const auto & key = pending.key;
const auto database = storage.getContext()->resolveDatabase(manifest.destination_database);
const auto & table = manifest.destination_table;

/// No need to query zk for status if the local one is not PENDING
if (entry.status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING)
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping... Local status is {}", magic_enum::enum_name(entry.status).data());
continue;
}

const auto destination_storage_id = StorageID(QualifiedTableName {database, table});

const auto destination_storage = DatabaseCatalog::instance().tryGetTable(destination_storage_id, storage.getContext());
Expand Down Expand Up @@ -131,8 +151,7 @@ void ExportPartitionTaskScheduler::run()

if (status_in_zk.value() != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING)
{
entry.status = status_in_zk.value();
LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping {}... Status from zk is {}", key, magic_enum::enum_name(entry.status).data());
LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping {}... Status from zk is {}", key, magic_enum::enum_name(status_in_zk.value()).data());
continue;
}

Expand Down
32 changes: 32 additions & 0 deletions src/Storages/MergeTree/ExportPartitionUtils.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include <Storages/MergeTree/ExportPartitionUtils.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/FailPoint.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
Expand Down Expand Up @@ -28,6 +30,16 @@ namespace ProfileEvents
extern const Event ExportPartitionZooKeeperGetChildren;
extern const Event ExportPartitionZooKeeperSet;
extern const Event ExportPartitionZooKeeperMulti;
extern const Event ExportPartitionLockReadWaitMicroseconds;
extern const Event ExportPartitionLockWriteWaitMicroseconds;
extern const Event ExportPartitionLockReadAcquisitions;
extern const Event ExportPartitionLockWriteAcquisitions;
}

namespace CurrentMetrics
{
extern const Metric ExportPartitionLockWaitingReaders;
extern const Metric ExportPartitionLockWaitingWriters;
}

namespace DB
Expand Down Expand Up @@ -57,6 +69,26 @@ namespace fs = std::filesystem;

namespace ExportPartitionUtils
{
std::shared_lock<SharedMutex> lockShared(SharedMutex & mutex)
{
CurrentMetrics::Increment waiting(CurrentMetrics::ExportPartitionLockWaitingReaders);
Stopwatch watch;
std::shared_lock<SharedMutex> lock(mutex);
ProfileEvents::increment(ProfileEvents::ExportPartitionLockReadWaitMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::ExportPartitionLockReadAcquisitions);
return lock;
}

std::unique_lock<SharedMutex> lockExclusive(SharedMutex & mutex)
{
CurrentMetrics::Increment waiting(CurrentMetrics::ExportPartitionLockWaitingWriters);
Stopwatch watch;
std::unique_lock<SharedMutex> lock(mutex);
ProfileEvents::increment(ProfileEvents::ExportPartitionLockWriteWaitMicroseconds, watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::ExportPartitionLockWriteAcquisitions);
return lock;
}

Block getPartitionSourceBlockForIcebergCommit(
MergeTreeData & storage, const String & partition_id)
{
Expand Down
12 changes: 12 additions & 0 deletions src/Storages/MergeTree/ExportPartitionUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
#include <filesystem>
#include <vector>
#include <string>
#include <mutex>
#include <shared_mutex>
#include <Core/Field.h>
#include <Common/Logger.h>
#include <Common/SharedMutex.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include "Storages/IStorage.h"
#include <Storages/StorageInMemoryMetadata.h>
Expand All @@ -23,6 +26,15 @@ struct ExportReplicatedMergeTreePartitionManifest;

namespace ExportPartitionUtils
{
/// Instrumented acquisition of the storage-wide export partition state lock.
/// `lockShared` is for readers (e.g. system.replicated_partition_exports); `lockExclusive`
/// is for the brief in-memory mutations performed by the background tasks and KILL EXPORT
/// PARTITION. Both record wait time and waiting-thread counts via the ExportPartitionLock*
/// metrics. The lock MUST NOT be held across ZooKeeper round-trips - gather ZK data first,
/// then take the lock only to apply the result.
std::shared_lock<SharedMutex> lockShared(SharedMutex & mutex);
std::unique_lock<SharedMutex> lockExclusive(SharedMutex & mutex);

std::vector<std::string> getExportedPaths(const LoggerPtr & log, const zkutil::ZooKeeperPtr & zk, const std::string & export_path);

ContextPtr getContextCopyWithTaskSettings(const ContextPtr & context, const ExportReplicatedMergeTreePartitionManifest & manifest);
Expand Down
32 changes: 25 additions & 7 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6132,7 +6132,7 @@ void StorageReplicatedMergeTree::shutdown(bool)
}

{
std::lock_guard lock(export_merge_tree_partition_mutex);
auto lock = ExportPartitionUtils::lockExclusive(export_merge_tree_partition_mutex);
export_merge_tree_partition_task_entries.clear();
}

Expand Down Expand Up @@ -10070,6 +10070,8 @@ CancellationCode StorageReplicatedMergeTree::killExportPartition(const String &
/// Called from a query thread (KILL EXPORT PARTITION via InterpreterKillQueryQuery), which does not have a component set.
auto component_guard = Coordination::setCurrentComponent("StorageReplicatedMergeTree::killExportPartition");

/// This is best-effort, even if we manage to set it to killed, it might be overwritten by a successful commit.

auto try_set_status_to_killed = [this](const zkutil::ZooKeeperPtr & zk, const std::string & status_path)
{
Coordination::Stat stat;
Expand Down Expand Up @@ -10105,23 +10107,39 @@ CancellationCode StorageReplicatedMergeTree::killExportPartition(const String &
return CancellationCode::CancelSent;
};

std::lock_guard lock(export_merge_tree_partition_mutex);

const auto zk = getZooKeeper();

/// Look up the entry in the in-memory mirror under a brief shared lock and copy out what we
/// need; release it before any ZooKeeper round-trip so the system.replicated_partition_exports
/// reader is never blocked. This is a pure read of the container, so a shared lock is enough -
/// the KILLED status set in ZooKeeper below propagates back into the mirror via the status
/// watch -> handleStatusChanges.
bool local_entry_found = false;
bool local_entry_pending = false;
std::string local_composite_key;
{
auto lock = ExportPartitionUtils::lockShared(export_merge_tree_partition_mutex);
const auto entry = export_merge_tree_partition_task_entries_by_transaction_id.find(transaction_id);
if (entry != export_merge_tree_partition_task_entries_by_transaction_id.end())
{
local_entry_found = true;
local_entry_pending = entry->status == ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING;
local_composite_key = entry->getCompositeKey();
}
}

/// if we have the entry locally, no need to list from zk. we can save some requests.
const auto & entry = export_merge_tree_partition_task_entries_by_transaction_id.find(transaction_id);
if (entry != export_merge_tree_partition_task_entries_by_transaction_id.end())
if (local_entry_found)
{
LOG_INFO(log, "Export partition task found locally, trying to cancel it");
/// found locally, no need to get children on zk
if (entry->status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING)
if (!local_entry_pending)
{
LOG_INFO(log, "Export partition task is not pending, can not cancel it");
return CancellationCode::CancelCannotBeSent;
}

return try_set_status_to_killed(zk, fs::path(zookeeper_path) / "exports" / entry->getCompositeKey() / "status");
return try_set_status_to_killed(zk, fs::path(zookeeper_path) / "exports" / local_composite_key / "status");
}
else
{
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/StorageReplicatedMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <Core/BackgroundSchedulePool.h>
#include <Common/EventNotifier.h>
#include <Common/ProfileEventsScope.h>
#include <Common/SharedMutex.h>
#include <Common/Throttler.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
Expand Down Expand Up @@ -525,7 +526,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData

Coordination::WatchCallbackPtr export_merge_tree_partition_watch_callback;

std::mutex export_merge_tree_partition_mutex;
mutable SharedMutex export_merge_tree_partition_mutex;

BackgroundSchedulePoolTaskHolder export_merge_tree_partition_select_task;

Expand Down
Loading