diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index c46cc25687aa..3dfee74eb565 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -13,6 +13,8 @@ M(MergeParts, "Number of source parts participating in current background merges") \ M(Move, "Number of currently executing moves") \ M(Export, "Number of currently executing exports") \ + M(ExportPartitionLockWaitingReaders, "Number of threads currently waiting to acquire the export partition in-memory state lock for reading (shared).") \ + M(ExportPartitionLockWaitingWriters, "Number of threads currently waiting to acquire the export partition in-memory state lock for writing (exclusive).") \ M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \ M(ReplicatedFetch, "Number of data parts being fetched from replica") \ M(ReplicatedSend, "Number of data parts being sent to replicas") \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 993e9d6a85cd..2c29449707c0 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -344,6 +344,10 @@ M(ExportPartitionZooKeeperMulti, "Number of 'multi' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ M(ExportPartitionZooKeeperExists, "Number of 'exists' requests to ZooKeeper made by the export partition feature.", ValueType::Number) \ M(ExportPartsRejectedByMemoryLimit, "Number of background export part tasks rejected due to background memory limit.", ValueType::Number) \ + M(ExportPartitionLockReadWaitMicroseconds, "Total time spent waiting to acquire the export partition in-memory state lock for reading (shared).", ValueType::Microseconds) \ + M(ExportPartitionLockWriteWaitMicroseconds, "Total time spent waiting to acquire the export partition in-memory state lock for writing (exclusive).", ValueType::Microseconds) \ + M(ExportPartitionLockReadAcquisitions, "Number of times the export partition in-memory state lock was acquired for reading (shared).", ValueType::Number) \ + M(ExportPartitionLockWriteAcquisitions, "Number of times the export partition in-memory state lock was acquired for writing (exclusive).", ValueType::Number) \ \ M(DistributedConnectionTries, "Total count of distributed connection attempts.", ValueType::Number) \ M(DistributedConnectionUsable, "Total count of successful distributed connections to a usable server (with required table, but maybe stale).", ValueType::Number) \ diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 8ba8d8fc64a0..57741ece593d 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -287,7 +287,7 @@ std::vector ExportPartitionManifestUpdatingTask:: std::vector snapshots; { - std::lock_guard lock(storage.export_merge_tree_partition_mutex); + auto lock = ExportPartitionUtils::lockShared(storage.export_merge_tree_partition_mutex); snapshots.reserve(storage.export_merge_tree_partition_task_entries_by_key.size()); for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_key) @@ -356,7 +356,9 @@ void ExportPartitionManifestUpdatingTask::poll() } { - std::lock_guard lock(storage.export_merge_tree_partition_mutex); + /// Task-serialization critical section: background_task_serialization_mutex is held + /// across the ZooKeeper reads below so poll() and handleStatusChanges() never overlap, + std::lock_guard task_guard(background_task_serialization_mutex); LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Polling for new entries for table {}. Current number of entries: {}", storage.getStorageID().getNameForLogs(), storage.export_merge_tree_partition_task_entries_by_key.size()); @@ -389,12 +391,6 @@ void ExportPartitionManifestUpdatingTask::poll() const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); - /// Read last_exception leaves (no watch). Surfacing exceptions in the system table relies - /// on this read being part of every poll cycle: per-part failures during PENDING do not - /// trigger a status watch, so the only refresh path while the task is still in-flight is - /// the periodic poll. An empty result collapses every "nothing actionable" case - /// (transient ZK error, no children, all leaves ZNONODE/malformed) into a no-op so the - /// in-memory copy stays intact. auto last_exception_per_replica = readLastExceptionPerReplica( zk, fs::path(entry_path), key, storage.log.load()); @@ -402,37 +398,43 @@ void ExportPartitionManifestUpdatingTask::poll() /// If the zk entry has been replaced with export_merge_tree_partition_force_export, checking only for the export key is not enough /// we need to make sure it is the same transaction id. If it is not, it needs to be replaced. - bool has_local_entry_and_is_up_to_date = local_entry != entries_by_key.end() + bool has_local_entry = local_entry != entries_by_key.end() && local_entry->manifest.transaction_id == metadata.transaction_id; - /// If the entry is up to date and we don't have the cleanup lock, refresh the in-memory - /// last_exception (surfaced by system.replicated_partition_exports) and early exit. - /// Direct mutation of the `mutable` field is safe under export_merge_tree_partition_mutex, - /// which is held throughout poll(). - if (!cleanup_lock && has_local_entry_and_is_up_to_date) - { - if (!last_exception_per_replica.empty()) - local_entry->last_exception_per_replica = std::move(last_exception_per_replica); - continue; - } + std::string status_string; - std::weak_ptr weak_manifest_updater = storage.export_merge_tree_partition_manifest_updater; + /// In theory, we should be notified when the status changes by the status watch + /// but in practice, the watch is not always reliable (e.g. if the ZooKeeper session is lost) + /// so we need to read the status from the ZK node directly. + if (has_local_entry) + { + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); - auto status_watch_callback = std::make_shared([weak_manifest_updater, key](const Coordination::WatchResponse &) + zk->tryGet(fs::path(entry_path) / "status", status_string); + } + else { - /// If the table is dropped but the watch is not removed, we need to prevent use after free - /// below code assumes that if manifest updater is still alive, the status handling task is also alive - if (auto manifest_updater = weak_manifest_updater.lock()) + /// If we don't have a local entry, we need to arm a status watch to be notified when the status changes + std::weak_ptr weak_manifest_updater = storage.export_merge_tree_partition_manifest_updater; + auto status_watch_callback = std::make_shared([weak_manifest_updater, key](const Coordination::WatchResponse &) { - manifest_updater->addStatusChange(key); - manifest_updater->storage.export_merge_tree_partition_status_handling_task->schedule(); - } - }); + /// If the table is dropped but the watch is not removed, we need to prevent use after free + /// below code assumes that if manifest updater is still alive, the status handling task is also alive + if (auto manifest_updater = weak_manifest_updater.lock()) + { + manifest_updater->addStatusChange(key); + manifest_updater->storage.export_merge_tree_partition_status_handling_task->schedule(); + } + }); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetWatch); - std::string status_string; - if (!zk->tryGetWatch(fs::path(entry_path) / "status", status_string, nullptr, status_watch_callback)) + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetWatch); + + zk->tryGetWatch(fs::path(entry_path) / "status", status_string, nullptr, status_watch_callback); + } + + if (status_string.empty()) { LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: missing status", key); continue; @@ -461,17 +463,30 @@ void ExportPartitionManifestUpdatingTask::poll() deferred_commits); } - if (has_local_entry_and_is_up_to_date) + if (!has_local_entry) { - /// Same refresh as the early-exit branch above; we also reach this point when - /// holding the cleanup lock (cleanup did not consume the entry). - if (!last_exception_per_replica.empty()) - local_entry->last_exception_per_replica = std::move(last_exception_per_replica); - LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: already exists", key); + addTask(metadata, *status, std::move(last_exception_per_replica), key, entries_by_key); + LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Added new entry for task {}", key); continue; } - addTask(metadata, *status, std::move(last_exception_per_replica), key, entries_by_key); + /// If we already have the local entry, we need to update it if the status has changed or if there are new last exceptions + const bool status_changed = local_entry->status != *status; + if (!last_exception_per_replica.empty() || status_changed) + { + auto mirror_lock = ExportPartitionUtils::lockExclusive(storage.export_merge_tree_partition_mutex); + if (!last_exception_per_replica.empty()) + local_entry->last_exception_per_replica = std::move(last_exception_per_replica); + if (status_changed) + { + local_entry->status = *status; + if (local_entry->status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) + /// terminal now - we no longer need to keep the data parts alive + local_entry->part_references.clear(); + } + } + LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: already exists", key); + } /// Remove entries that were deleted by someone else @@ -479,9 +494,10 @@ void ExportPartitionManifestUpdatingTask::poll() LOG_INFO(storage.log, "ExportPartition Manifest Updating task: finished polling for new entries. Number of entries: {}", entries_by_key.size()); } - /// `export_merge_tree_partition_mutex` released here. Everything below runs without it - /// so concurrent readers of `system.replicated_partition_exports` and other writers are - /// not blocked by the (potentially slow) catalog round-trips below. + /// `background_task_serialization_mutex` released here (the mirror lock was only ever held + /// briefly, per mutation, inside the section above). Everything below runs without any lock + /// so concurrent readers of `system.replicated_partition_exports` and handleStatusChanges() + /// are not blocked by the (potentially slow) catalog round-trips below. /// /// `cleanup_lock` (the ZK ephemeral node) is INTENTIONALLY still held here and is only /// destructed at end of function. This preserves the existing cross-replica invariant: @@ -561,7 +577,10 @@ void ExportPartitionManifestUpdatingTask::addTask( } /// Insert or update entry. The multi_index container automatically maintains both indexes. + /// Only the container mutation takes the mirror lock (exclusively, briefly); the part-reference + /// gathering above ran lock-free. ExportReplicatedMergeTreePartitionTaskEntry entry {metadata, status, std::move(part_references), std::move(last_exception_per_replica)}; + auto mirror_lock = ExportPartitionUtils::lockExclusive(storage.export_merge_tree_partition_mutex); auto it = entries_by_key.find(key); if (it != entries_by_key.end()) entries_by_key.replace(it, entry); @@ -574,18 +593,23 @@ void ExportPartitionManifestUpdatingTask::removeStaleEntries( auto & entries_by_key ) { - for (auto it = entries_by_key.begin(); it != entries_by_key.end();) + /// Collect the stale keys first (read-only iteration; poll() is the sole container mutator + /// while it holds background_task_serialization_mutex). Then kill the local export tasks and + /// erase the entries, taking the mirror lock exclusively only for the brief erase and never + /// across killExportPart (which takes export_manifests_mutex). + std::vector> stale_keys; /// (composite key, transaction id) + for (const auto & entry : entries_by_key) { - const auto & key = it->getCompositeKey(); + const auto & key = entry.getCompositeKey(); if (zk_children.contains(key)) - { - ++it; continue; - } + stale_keys.emplace_back(key, entry.manifest.transaction_id); + } - const auto & transaction_id = it->manifest.transaction_id; + for (const auto & [key, transaction_id] : stale_keys) + { LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Export task {} was deleted, calling killExportPartition for transaction {}", key, transaction_id); - + try { storage.killExportPart(transaction_id); @@ -595,7 +619,10 @@ void ExportPartitionManifestUpdatingTask::removeStaleEntries( tryLogCurrentException(storage.log, __PRETTY_FUNCTION__); } - it = entries_by_key.erase(it); + auto mirror_lock = ExportPartitionUtils::lockExclusive(storage.export_merge_tree_partition_mutex); + auto it = entries_by_key.find(key); + if (it != entries_by_key.end()) + entries_by_key.erase(it); } } @@ -616,7 +643,12 @@ void ExportPartitionManifestUpdatingTask::handleStatusChanges() try { - std::lock_guard task_entries_lock(storage.export_merge_tree_partition_mutex); + /// Task-serialization critical section: background_task_serialization_mutex serializes + /// this against poll() and is held across the ZooKeeper status reads below. No catalog I/O + /// happens here. Each in-memory entry mutation takes export_merge_tree_partition_mutex + /// exclusively for the brief write only, so the shared-lock reader of + /// system.replicated_partition_exports is never blocked by a ZooKeeper round-trip. + std::lock_guard task_guard(background_task_serialization_mutex); auto zk = storage.getZooKeeper(); LOG_INFO(storage.log, "ExportPartition Manifest Updating task: handling status changes. Number of status changes: {}", local_status_changes.size()); @@ -660,20 +692,17 @@ void ExportPartitionManifestUpdatingTask::handleStatusChanges() LOG_INFO(storage.log, "ExportPartition Manifest Updating task: status changed for task {}. New status: {}", key, magic_enum::enum_name(*new_status).data()); - /// Refresh last_exception leaves too. Status transitions to FAILED (via commit budget) - /// and KILLED (via timeout) atomically write a per-replica leaf in the same multi, so - /// reading them here ensures the system table surfaces the cause together with the - /// visible state change. No new watch is added — this piggybacks on the existing - /// status watch. An empty result means "nothing actionable" and leaves the previous - /// snapshot intact. - if (auto fetched = readLastExceptionPerReplica( - zk, fs::path(storage.zookeeper_path) / "exports" / key, key, storage.log.load()); - !fetched.empty()) - { - it->last_exception_per_replica = std::move(fetched); - } - - /// If status changed to KILLED, cancel local export operations + /// Refresh last_exception leaves too (ZooKeeper read, lock-free). Status transitions to + /// FAILED (via commit budget) and KILLED (via timeout) atomically write a per-replica + /// leaf in the same multi, so reading them here ensures the system table surfaces the + /// cause together with the visible state change. No new watch is added — this piggybacks + /// on the existing status watch. An empty result means "nothing actionable" and leaves + /// the previous snapshot intact. + auto fetched = readLastExceptionPerReplica( + zk, fs::path(storage.zookeeper_path) / "exports" / key, key, storage.log.load()); + + /// If status changed to KILLED, cancel local export operations. killExportPart takes + /// export_manifests_mutex (not the mirror lock), so call it without the mirror lock held. if (*new_status == ExportReplicatedMergeTreePartitionTaskEntry::Status::KILLED) { try @@ -687,12 +716,20 @@ void ExportPartitionManifestUpdatingTask::handleStatusChanges() } } - it->status = *new_status; - - if (it->status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) + /// Apply the in-memory updates under a brief exclusive mirror lock. { - /// we no longer need to keep the data parts alive - it->part_references.clear(); + auto mirror_lock = ExportPartitionUtils::lockExclusive(storage.export_merge_tree_partition_mutex); + + if (!fetched.empty()) + it->last_exception_per_replica = std::move(fetched); + + it->status = *new_status; + + if (it->status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) + { + /// we no longer need to keep the data parts alive + it->part_references.clear(); + } } local_status_changes.pop(); diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h index 32487f2dc68c..2bf3cf01bd1b 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h @@ -45,6 +45,15 @@ class ExportPartitionManifestUpdatingTask std::mutex status_changes_mutex; std::queue status_changes; + + /// Serializes the full bodies of poll() and handleStatusChanges() against each other. + /// Held across ZooKeeper I/O so those two tasks never overlap; the mirror lock + /// (StorageReplicatedMergeTree::export_merge_tree_partition_mutex) is then taken only + /// briefly under this, for the in-memory container mutations. This is what lets the + /// system.replicated_partition_exports reader (which takes the mirror lock shared and + /// briefly) avoid waiting behind slow-network ZooKeeper round-trips. + /// Lock ordering: this -> export_merge_tree_partition_mutex -> export_manifests_mutex. + std::mutex background_task_serialization_mutex; }; } diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index d722fb77b2c0..f61303f6a65f 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -77,12 +77,39 @@ void ExportPartitionTaskScheduler::run() const uint32_t seed = uint32_t(std::hash{}(storage.replica_name)) ^ uint32_t(scheduled_exports_count); pcg64_fast rng(seed); - std::lock_guard lock(storage.export_merge_tree_partition_mutex); + /// Snapshot the PENDING entries under a brief shared lock, then perform all ZooKeeper + /// work and exportPartToTable below WITHOUT holding the lock. The scheduler is a pure + /// reader of the in-memory mirror - it no longer writes entry.status (the status + /// converges via the status watch -> handleStatusChanges and poll()), so a shared lock + /// is sufficient and the system.replicated_partition_exports reader is never blocked by + /// the scheduler. + struct PendingEntrySnapshot + { + std::string key; + ExportReplicatedMergeTreePartitionManifest manifest; + }; + + std::vector pending_entries; + { + auto lock = ExportPartitionUtils::lockShared(storage.export_merge_tree_partition_mutex); + + // Iterate sorted by create_time + for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_create_time) + { + /// No need to query zk for status if the local one is not PENDING + if (entry.status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping... Local status is {}", magic_enum::enum_name(entry.status).data()); + continue; + } + + pending_entries.push_back(PendingEntrySnapshot{entry.getCompositeKey(), entry.manifest}); + } + } auto zk = storage.getZooKeeper(); - // Iterate sorted by create_time - for (auto & entry : storage.export_merge_tree_partition_task_entries_by_create_time) + for (const auto & pending : pending_entries) { if (scheduled_exports_count >= available_move_executors) { @@ -90,18 +117,11 @@ void ExportPartitionTaskScheduler::run() break; } - const auto & manifest = entry.manifest; - const auto key = entry.getCompositeKey(); + const auto & manifest = pending.manifest; + const auto & key = pending.key; const auto database = storage.getContext()->resolveDatabase(manifest.destination_database); const auto & table = manifest.destination_table; - /// No need to query zk for status if the local one is not PENDING - if (entry.status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) - { - LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping... Local status is {}", magic_enum::enum_name(entry.status).data()); - continue; - } - const auto destination_storage_id = StorageID(QualifiedTableName {database, table}); const auto destination_storage = DatabaseCatalog::instance().tryGetTable(destination_storage_id, storage.getContext()); @@ -131,8 +151,7 @@ void ExportPartitionTaskScheduler::run() if (status_in_zk.value() != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) { - entry.status = status_in_zk.value(); - LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping {}... Status from zk is {}", key, magic_enum::enum_name(entry.status).data()); + LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping {}... Status from zk is {}", key, magic_enum::enum_name(status_in_zk.value()).data()); continue; } diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index 679e07d0b132..d6928a0f9ecd 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -1,6 +1,8 @@ #include #include #include +#include +#include #include #include #include @@ -28,6 +30,16 @@ namespace ProfileEvents extern const Event ExportPartitionZooKeeperGetChildren; extern const Event ExportPartitionZooKeeperSet; extern const Event ExportPartitionZooKeeperMulti; + extern const Event ExportPartitionLockReadWaitMicroseconds; + extern const Event ExportPartitionLockWriteWaitMicroseconds; + extern const Event ExportPartitionLockReadAcquisitions; + extern const Event ExportPartitionLockWriteAcquisitions; +} + +namespace CurrentMetrics +{ + extern const Metric ExportPartitionLockWaitingReaders; + extern const Metric ExportPartitionLockWaitingWriters; } namespace DB @@ -57,6 +69,26 @@ namespace fs = std::filesystem; namespace ExportPartitionUtils { + std::shared_lock lockShared(SharedMutex & mutex) + { + CurrentMetrics::Increment waiting(CurrentMetrics::ExportPartitionLockWaitingReaders); + Stopwatch watch; + std::shared_lock lock(mutex); + ProfileEvents::increment(ProfileEvents::ExportPartitionLockReadWaitMicroseconds, watch.elapsedMicroseconds()); + ProfileEvents::increment(ProfileEvents::ExportPartitionLockReadAcquisitions); + return lock; + } + + std::unique_lock lockExclusive(SharedMutex & mutex) + { + CurrentMetrics::Increment waiting(CurrentMetrics::ExportPartitionLockWaitingWriters); + Stopwatch watch; + std::unique_lock lock(mutex); + ProfileEvents::increment(ProfileEvents::ExportPartitionLockWriteWaitMicroseconds, watch.elapsedMicroseconds()); + ProfileEvents::increment(ProfileEvents::ExportPartitionLockWriteAcquisitions); + return lock; + } + Block getPartitionSourceBlockForIcebergCommit( MergeTreeData & storage, const String & partition_id) { diff --git a/src/Storages/MergeTree/ExportPartitionUtils.h b/src/Storages/MergeTree/ExportPartitionUtils.h index 0434bc59a2cb..15e130a6873e 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.h +++ b/src/Storages/MergeTree/ExportPartitionUtils.h @@ -3,8 +3,11 @@ #include #include #include +#include +#include #include #include +#include #include #include "Storages/IStorage.h" #include @@ -23,6 +26,15 @@ struct ExportReplicatedMergeTreePartitionManifest; namespace ExportPartitionUtils { + /// Instrumented acquisition of the storage-wide export partition state lock. + /// `lockShared` is for readers (e.g. system.replicated_partition_exports); `lockExclusive` + /// is for the brief in-memory mutations performed by the background tasks and KILL EXPORT + /// PARTITION. Both record wait time and waiting-thread counts via the ExportPartitionLock* + /// metrics. The lock MUST NOT be held across ZooKeeper round-trips - gather ZK data first, + /// then take the lock only to apply the result. + std::shared_lock lockShared(SharedMutex & mutex); + std::unique_lock lockExclusive(SharedMutex & mutex); + std::vector getExportedPaths(const LoggerPtr & log, const zkutil::ZooKeeperPtr & zk, const std::string & export_path); ContextPtr getContextCopyWithTaskSettings(const ContextPtr & context, const ExportReplicatedMergeTreePartitionManifest & manifest); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 046399f0bdfe..8286b91507a1 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -6132,7 +6132,7 @@ void StorageReplicatedMergeTree::shutdown(bool) } { - std::lock_guard lock(export_merge_tree_partition_mutex); + auto lock = ExportPartitionUtils::lockExclusive(export_merge_tree_partition_mutex); export_merge_tree_partition_task_entries.clear(); } @@ -10070,6 +10070,8 @@ CancellationCode StorageReplicatedMergeTree::killExportPartition(const String & /// Called from a query thread (KILL EXPORT PARTITION via InterpreterKillQueryQuery), which does not have a component set. auto component_guard = Coordination::setCurrentComponent("StorageReplicatedMergeTree::killExportPartition"); + /// This is best-effort, even if we manage to set it to killed, it might be overwritten by a successful commit. + auto try_set_status_to_killed = [this](const zkutil::ZooKeeperPtr & zk, const std::string & status_path) { Coordination::Stat stat; @@ -10105,23 +10107,39 @@ CancellationCode StorageReplicatedMergeTree::killExportPartition(const String & return CancellationCode::CancelSent; }; - std::lock_guard lock(export_merge_tree_partition_mutex); - const auto zk = getZooKeeper(); + /// Look up the entry in the in-memory mirror under a brief shared lock and copy out what we + /// need; release it before any ZooKeeper round-trip so the system.replicated_partition_exports + /// reader is never blocked. This is a pure read of the container, so a shared lock is enough - + /// the KILLED status set in ZooKeeper below propagates back into the mirror via the status + /// watch -> handleStatusChanges. + bool local_entry_found = false; + bool local_entry_pending = false; + std::string local_composite_key; + { + auto lock = ExportPartitionUtils::lockShared(export_merge_tree_partition_mutex); + const auto entry = export_merge_tree_partition_task_entries_by_transaction_id.find(transaction_id); + if (entry != export_merge_tree_partition_task_entries_by_transaction_id.end()) + { + local_entry_found = true; + local_entry_pending = entry->status == ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING; + local_composite_key = entry->getCompositeKey(); + } + } + /// if we have the entry locally, no need to list from zk. we can save some requests. - const auto & entry = export_merge_tree_partition_task_entries_by_transaction_id.find(transaction_id); - if (entry != export_merge_tree_partition_task_entries_by_transaction_id.end()) + if (local_entry_found) { LOG_INFO(log, "Export partition task found locally, trying to cancel it"); /// found locally, no need to get children on zk - if (entry->status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) + if (!local_entry_pending) { LOG_INFO(log, "Export partition task is not pending, can not cancel it"); return CancellationCode::CancelCannotBeSent; } - return try_set_status_to_killed(zk, fs::path(zookeeper_path) / "exports" / entry->getCompositeKey() / "status"); + return try_set_status_to_killed(zk, fs::path(zookeeper_path) / "exports" / local_composite_key / "status"); } else { diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 30b038901e96..86e590404f2b 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -525,7 +526,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData Coordination::WatchCallbackPtr export_merge_tree_partition_watch_callback; - std::mutex export_merge_tree_partition_mutex; + mutable SharedMutex export_merge_tree_partition_mutex; BackgroundSchedulePoolTaskHolder export_merge_tree_partition_select_task;