diff --git a/docs/en/antalya/partition_export.md b/docs/en/antalya/partition_export.md index 975915859482..5365ed90e353 100644 --- a/docs/en/antalya/partition_export.md +++ b/docs/en/antalya/partition_export.md @@ -6,7 +6,7 @@ The `ALTER TABLE EXPORT PARTITION` command exports entire partitions from Replic The set of parts that are exported is based on the list of parts the replica that received the export command sees. The other replicas will assist in the export process if they have those parts locally. Otherwise they will ignore it. -The partition export tasks can be observed through `system.replicated_partition_exports`. Querying this table results in a query to ZooKeeper, so it must be used with care. Individual part export progress can be observed as usual through `system.exports`. +The partition export tasks can be observed through `system.replicated_partition_exports`. The table is served from each replica's in-memory mirror, so queries do not contact ZooKeeper and are cheap to run. The mirror is refreshed on the manifest-updater poll cycle and on every status change, so a freshly written exception or terminal state may take up to one poll interval to appear. Individual part export progress can be observed as usual through `system.exports`. The same partition can not be exported to the same destination more than once. There are two ways to override this behavior: either by setting the `export_merge_tree_partition_force_export` setting or waiting for the task to expire. @@ -105,7 +105,7 @@ TO TABLE [destination_database.]destination_table - **Type**: `UInt64` - **Default**: `3600` - **Description**: The timeout is measured from the manifest's create_time. Set to 0 to disable the timeout. -When the timeout is exceeded the task transitions to KILLED (same terminal state as `KILL QUERY ... EXPORT PARTITION`), and `last_exception` is populated with a timeout reason. +When the timeout is exceeded the task transitions to KILLED (same terminal state as `KILL QUERY ... EXPORT PARTITION`), and a `last_exception_per_replica` entry on the replica that fires the timeout is populated with a timeout reason. Notes: - Enforcement is best-effort: actual kill latency is bounded by one manifest-updater poll cycle (~30s) plus ZooKeeper watch propagation. @@ -170,9 +170,7 @@ parts: ['2022_0_0_0','2022_1_1_0','2022_2_2_0'] parts_count: 3 parts_to_do: 0 status: COMPLETED -exception_replica: -last_exception: -exception_part: +last_exception_per_replica: [] exception_count: 0 Row 2: @@ -189,9 +187,7 @@ parts: ['2021_0_0_0'] parts_count: 1 parts_to_do: 0 status: COMPLETED -exception_replica: -last_exception: -exception_part: +last_exception_per_replica: [] exception_count: 0 2 rows in set. Elapsed: 0.019 sec. @@ -205,6 +201,20 @@ Status values include: - `FAILED` - Export failed - `KILLED` - Export was cancelled +### Exception columns + +- `last_exception_per_replica` is an `Array(Tuple(replica String, message String, part String, time DateTime, count UInt64))`. Each tuple is the most recent exception observed by a single replica plus a best-effort within-replica `count`. Replicas that have never reported an exception are omitted. +- `exception_count` is the sum of every `count` in `last_exception_per_replica`. Each replica owns its own counter, so cross-replica updates do not race; the sum is exact w.r.t. the snapshot returned. Within a single replica concurrent failing writers may under-count by one. + +To pick the latest exception across replicas: + +```sql +SELECT + arraySort(x -> -x.time, last_exception_per_replica)[1] AS latest_exception +FROM system.replicated_partition_exports +WHERE source_table = 'rmt_table' AND destination_table = 's3_table'; +``` + ## Related Features - [ALTER TABLE EXPORT PART](/docs/en/engines/table-engines/mergetree-family/part_export.md) - Export individual parts (non-replicated) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index f0ed3b9aaa7a..fc29b8681dbc 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7404,10 +7404,6 @@ Throw an error if there are pending patch parts when exporting a merge tree part DECLARE(Bool, export_merge_tree_partition_lock_inside_the_task, false, R"( Only lock a part when the task is already running. This might help with busy waiting where the scheduler locks a part, but the task ends in the pending list. On the other hand, there is a chance once the task executes that part has already been locked by another replica and the task will simply early exit. -)", 0) \ - DECLARE(Bool, export_merge_tree_partition_system_table_prefer_remote_information, false, R"( -Controls whether the system.replicated_partition_exports will prefer to query ZooKeeper to get the most up to date information or use the local information. -Querying ZooKeeper is expensive, and only available if the ZooKeeper feature flag MULTI_READ is enabled. )", 0) \ DECLARE(Timezone, iceberg_partition_timezone, "", R"( Time zone by which partitioning of Iceberg tables was performed. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index a1b03a639ab8..c37857f5d546 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -262,7 +262,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."}, {"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."}, {"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."}, - {"export_merge_tree_partition_system_table_prefer_remote_information", true, true, "New setting."}, {"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."}, {"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."}, {"object_storage_cluster", "", "", "Antalya: New setting"}, diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index f1c96b120a28..65a2d860868d 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -60,6 +60,60 @@ struct ExportReplicatedMergeTreePartitionProcessingPartEntry } }; +/// Per-task "last exception" record persisted at /last_exception. +/// +/// Single znode per export task. Updated atomically with the surrounding state +/// transition (status flip / lock release / retry counter bump) via a single +/// `tryMulti` Set op. The `count` field is best-effort and non-atomic: writers +/// `tryGet` the current value and write `count + 1` back without a version +/// check, so concurrent writers may under-count. This matches the semantics +/// used by `commit_attempts` and is documented in the system table. +struct LastExceptionEntry +{ + String message; + String part; /// empty for task-level exceptions (commit failure, timeout) + String replica; + time_t time = 0; + size_t count = 0; + + std::string toJsonString() const + { + Poco::JSON::Object json; + json.set("message", message); + json.set("part", part); + json.set("replica", replica); + json.set("time", time); + json.set("count", count); + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + oss.exceptions(std::ios::failbit); + Poco::JSON::Stringifier::stringify(json, oss); + return oss.str(); + } + + static LastExceptionEntry fromJsonString(const std::string & json_string) + { + LastExceptionEntry entry; + if (json_string.empty()) + return entry; + + Poco::JSON::Parser parser; + auto json = parser.parse(json_string).extract(); + chassert(json); + + if (json->has("message")) + entry.message = json->getValue("message"); + if (json->has("part")) + entry.part = json->getValue("part"); + if (json->has("replica")) + entry.replica = json->getValue("replica"); + if (json->has("time")) + entry.time = json->getValue("time"); + if (json->has("count")) + entry.count = json->getValue("count"); + return entry; + } +}; + struct ExportReplicatedMergeTreePartitionProcessedPartEntry { String part_name; diff --git a/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h b/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h index e62f7de99bed..8af873e0b89c 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include "Core/QualifiedTableName.h" @@ -32,6 +33,13 @@ struct ExportReplicatedMergeTreePartitionTaskEntry /// There is also a chance this replica does not contain a given part and it is totally ok. mutable std::vector part_references; + /// In-memory mirror of /last_exception/ leaves in ZK, + /// keyed by replica name (verbatim, not escaped). Refreshed on every poll() cycle + /// and on every status-change handler invocation; served verbatim to + /// system.replicated_partition_exports without any extra ZK read. + /// An empty map means no replica has recorded an exception yet for this task. + mutable std::map last_exception_per_replica; + std::string getCompositeKey() const { const auto qualified_table_name = QualifiedTableName {manifest.destination_database, manifest.destination_table}; diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 0ed8d1033135..106dff071188 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -36,6 +37,86 @@ namespace FailPoints namespace { + /// Fetch all per-replica last_exception leaves under /last_exception and build + /// a fresh map keyed by replica name. The map key prefers the unescaped `replica` field + /// embedded in the JSON payload; if it is missing or empty, the leaf name is unescaped as + /// a fallback. + /// + /// An empty result means "nothing actionable": either the parent getChildren failed (ZK + /// glitch), the container has no children yet (no replica has reported), or every leaf + /// fetch came back ZNONODE / malformed. Callers MUST skip the assignment in that case to + /// preserve the in-memory mirror across transient errors. This is safe because per-replica + /// leaves are never individually removed — the entire entry path is wiped recursively when + /// a task is cleaned up, which is handled separately by removeStaleEntries. + std::map readLastExceptionPerReplica( + const zkutil::ZooKeeperPtr & zk, + const std::filesystem::path & entry_path, + const std::string & log_key, + const LoggerPtr & log) + { + std::map out; + + const auto container_path = entry_path / "last_exception"; + + Strings children; + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); + if (Coordination::Error::ZOK != zk->tryGetChildren(container_path, children)) + { + LOG_INFO(log, "ExportPartition Manifest Updating Task: failed to list last_exception leaves for {}, leaving in-memory copy untouched", log_key); + return out; + } + + if (children.empty()) + return out; + + std::vector paths; + paths.reserve(children.size()); + for (const auto & child : children) + paths.emplace_back(container_path / child); + + /// One MULTI_READ when supported, parallel async gets otherwise. See + /// ZooKeeper::multiRead in src/Common/ZooKeeper/ZooKeeper.h. + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet, paths.size()); + auto responses = zk->tryGet(paths); + responses.waitForResponses(); + + for (size_t i = 0; i < paths.size(); ++i) + { + Coordination::GetResponse response; + try + { + /// MultiTryGetResponse::operator[] swallows ZNONODE but rethrows on + /// other errors; treat any unexpected Keeper error as "skip this + /// leaf, retry on the next poll". Matches the lenient semantics of + /// the previous per-leaf tryGet implementation. + response = responses[i]; + } + catch (...) + { + LOG_WARNING(log, "ExportPartition Manifest Updating Task: ZK error fetching last_exception leaf {} for {}, skipping", children[i], log_key); + continue; + } + + if (response.error != Coordination::Error::ZOK) + continue; /// ZNONODE: child concurrently removed (recursive cleanup race). + + try + { + auto entry = LastExceptionEntry::fromJsonString(response.data); + String replica = entry.replica.empty() ? unescapeForFileName(children[i]) : entry.replica; + out.emplace(std::move(replica), std::move(entry)); + } + catch (...) + { + LOG_WARNING(log, "ExportPartition Manifest Updating Task: malformed last_exception JSON for {} (leaf {}), ignoring", log_key, children[i]); + } + } + + return out; + } + /* Remove expired entries and fix non-committed exports that have already exported all parts. @@ -178,10 +259,13 @@ namespace /// Bump commit-attempts counter; transition to FAILED once the budget is exhausted. /// This is the primary retry path for the commit phase — handlePartExportSuccess /// only fires once (on the last part's completion); subsequent retries come from here. + /// The exception is recorded in /last_exception inside the same multi. const bool became_failed = ExportPartitionUtils::handleCommitFailure( zk, entry_path, metadata.max_retries, + storage.getReplicaName(), + e.message(), log); if (became_failed) @@ -214,348 +298,13 @@ ExportPartitionManifestUpdatingTask::ExportPartitionManifestUpdatingTask(Storage std::vector ExportPartitionManifestUpdatingTask::getPartitionExportsInfo() const { - std::vector infos; - const auto zk = storage.getZooKeeper(); - - const auto exports_path = fs::path(storage.zookeeper_path) / "exports"; - - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); - - std::vector children; - if (Coordination::Error::ZOK != zk->tryGetChildren(exports_path, children)) - { - LOG_INFO(storage.log, "Failed to get children from exports path, returning empty export info list"); - return infos; - } - - if (children.empty()) - return infos; - - /// Batch all metadata.json, status gets, and getChildren operations in a single multi request - Coordination::Requests requests; - requests.reserve(children.size() * 4); // metadata, status, processing, exceptions_per_replica - - // Track response indices for each child - struct ChildResponseIndices - { - size_t metadata_idx; - size_t status_idx; - size_t processing_idx; - size_t exceptions_per_replica_idx; - }; - std::vector response_indices; - response_indices.reserve(children.size()); - - for (const auto & child : children) - { - const auto export_partition_path = fs::path(exports_path) / child; - - ChildResponseIndices indices; - indices.metadata_idx = requests.size(); - requests.push_back(zkutil::makeGetRequest(export_partition_path / "metadata.json")); - - indices.status_idx = requests.size(); - requests.push_back(zkutil::makeGetRequest(export_partition_path / "status")); - - indices.processing_idx = requests.size(); - requests.push_back(zkutil::makeListRequest(export_partition_path / "processing")); - - indices.exceptions_per_replica_idx = requests.size(); - requests.push_back(zkutil::makeListRequest(export_partition_path / "exceptions_per_replica")); - - response_indices.push_back(indices); - } - - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); - - Coordination::Responses responses; - Coordination::Error code = zk->tryMulti(requests, responses); - - if (code != Coordination::Error::ZOK) - { - LOG_INFO(storage.log, "Failed to execute multi request for export partition info, error: {}", code); - return infos; - } - - // Helper to extract GetResponse data - auto getGetResponseData = [&responses](size_t idx) -> std::pair - { - if (idx >= responses.size()) - return {Coordination::Error::ZRUNTIMEINCONSISTENCY, ""}; - - const auto * get_response = dynamic_cast(responses[idx].get()); - if (!get_response) - return {Coordination::Error::ZRUNTIMEINCONSISTENCY, ""}; - - return {get_response->error, get_response->data}; - }; - - // Helper to extract ListResponse data - auto getListResponseData = [&responses](size_t idx) -> std::pair - { - if (idx >= responses.size()) - return {Coordination::Error::ZRUNTIMEINCONSISTENCY, Strings{}}; - - const auto * list_response = dynamic_cast(responses[idx].get()); - if (!list_response) - return {Coordination::Error::ZRUNTIMEINCONSISTENCY, Strings{}}; - - return {list_response->error, list_response->names}; - }; - - // Create response wrappers matching the MultiTryGetResponse/MultiTryGetChildrenResponse interface - struct ResponseWrapper - { - Coordination::Error error; - std::string data; - Strings names; - - ResponseWrapper(Coordination::Error err, const std::string & d, const Strings & n) - : error(err), data(d), names(n) {} - }; - - std::vector metadata_responses_wrapper; - std::vector status_responses_wrapper; - std::vector processing_responses_wrapper; - std::vector exceptions_per_replica_responses_wrapper; - - metadata_responses_wrapper.reserve(children.size()); - status_responses_wrapper.reserve(children.size()); - processing_responses_wrapper.reserve(children.size()); - exceptions_per_replica_responses_wrapper.reserve(children.size()); - - for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) - { - const auto & indices = response_indices[child_idx]; - - // Extract metadata response - auto [metadata_error, metadata_data] = getGetResponseData(indices.metadata_idx); - metadata_responses_wrapper.emplace_back(metadata_error, metadata_data, Strings{}); - - // Extract status response - auto [status_error, status_data] = getGetResponseData(indices.status_idx); - status_responses_wrapper.emplace_back(status_error, status_data, Strings{}); - - // Extract processing response - auto [processing_error, processing_names] = getListResponseData(indices.processing_idx); - processing_responses_wrapper.emplace_back(processing_error, "", processing_names); - - // Extract exceptions_per_replica response - auto [exceptions_error, exceptions_names] = getListResponseData(indices.exceptions_per_replica_idx); - exceptions_per_replica_responses_wrapper.emplace_back(exceptions_error, "", exceptions_names); - } - - // Use wrapper vectors directly - they match the interface expected by the code below - auto & metadata_responses = metadata_responses_wrapper; - auto & status_responses = status_responses_wrapper; - auto & processing_responses = processing_responses_wrapper; - auto & exceptions_per_replica_responses = exceptions_per_replica_responses_wrapper; - - /// Collect all exception replica paths for batching - struct ExceptionReplicaPath - { - size_t child_idx; - std::string replica; - std::string count_path; - std::string exception_path; - std::string part_path; - }; - - std::vector exception_replica_paths; - for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) - { - const auto & child = children[child_idx]; - const auto export_partition_path = fs::path(exports_path) / child; - /// Check if we got valid responses - if (metadata_responses[child_idx].error != Coordination::Error::ZOK) - { - LOG_INFO(storage.log, "Skipping {}: missing metadata.json", child); - continue; - } - if (status_responses[child_idx].error != Coordination::Error::ZOK) - { - LOG_INFO(storage.log, "Skipping {}: missing status", child); - continue; - } - if (processing_responses[child_idx].error != Coordination::Error::ZOK) - { - LOG_INFO(storage.log, "Skipping {}: missing processing parts", child); - continue; - } - if (exceptions_per_replica_responses[child_idx].error != Coordination::Error::ZOK) - { - LOG_INFO(storage.log, "Skipping {}: missing exceptions_per_replica", export_partition_path); - continue; - } - const auto exceptions_per_replica_path = export_partition_path / "exceptions_per_replica"; - const auto & exception_replicas = exceptions_per_replica_responses[child_idx].names; - for (const auto & replica : exception_replicas) - { - const auto last_exception_path = exceptions_per_replica_path / replica / "last_exception"; - exception_replica_paths.push_back({ - child_idx, - replica, - (exceptions_per_replica_path / replica / "count").string(), - (last_exception_path / "exception").string(), - (last_exception_path / "part").string() - }); - } - } - /// Batch get all exception data in a single multi request - std::map>> exception_data_by_child; - - if (!exception_replica_paths.empty()) - { - Coordination::Requests exception_requests; - exception_requests.reserve(exception_replica_paths.size() * 3); // count, exception, part for each - - // Track response indices for each exception replica path - struct ExceptionResponseIndices - { - size_t count_idx; - size_t exception_idx; - size_t part_idx; - }; - std::vector exception_response_indices; - exception_response_indices.reserve(exception_replica_paths.size()); - - for (const auto & erp : exception_replica_paths) - { - ExceptionResponseIndices indices; - indices.count_idx = exception_requests.size(); - exception_requests.push_back(zkutil::makeGetRequest(erp.count_path)); - - indices.exception_idx = exception_requests.size(); - exception_requests.push_back(zkutil::makeGetRequest(erp.exception_path)); - - indices.part_idx = exception_requests.size(); - exception_requests.push_back(zkutil::makeGetRequest(erp.part_path)); - - exception_response_indices.push_back(indices); - } - - // Execute single multi request for all exception data - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti); - - Coordination::Responses exception_responses; - Coordination::Error exception_code = zk->tryMulti(exception_requests, exception_responses); - - if (exception_code != Coordination::Error::ZOK) - { - LOG_INFO(storage.log, "Failed to execute multi request for exception data, error: {}", exception_code); - } - else - { - // Parse exception responses - for (size_t exception_path_idx = 0; exception_path_idx < exception_replica_paths.size(); ++exception_path_idx) - { - const auto & erp = exception_replica_paths[exception_path_idx]; - const auto & indices = exception_response_indices[exception_path_idx]; - - std::string count_str; - std::string exception_str; - std::string part_str; - - // Extract count response - if (indices.count_idx < exception_responses.size()) - { - const auto * count_response = dynamic_cast(exception_responses[indices.count_idx].get()); - if (count_response && count_response->error == Coordination::Error::ZOK) - count_str = count_response->data; - } - - // Extract exception response - if (indices.exception_idx < exception_responses.size()) - { - const auto * exception_response = dynamic_cast(exception_responses[indices.exception_idx].get()); - if (exception_response && exception_response->error == Coordination::Error::ZOK) - exception_str = exception_response->data; - } - - // Extract part response - if (indices.part_idx < exception_responses.size()) - { - const auto * part_response = dynamic_cast(exception_responses[indices.part_idx].get()); - if (part_response && part_response->error == Coordination::Error::ZOK) - part_str = part_response->data; - } - - exception_data_by_child[erp.child_idx].emplace_back(erp.replica, count_str, exception_str, part_str); - } - } - } - - /// Build the result - for (size_t child_idx = 0; child_idx < children.size(); ++child_idx) - { - /// Skip if we already determined this child is invalid - if (metadata_responses[child_idx].error != Coordination::Error::ZOK - || status_responses[child_idx].error != Coordination::Error::ZOK - || processing_responses[child_idx].error != Coordination::Error::ZOK - || exceptions_per_replica_responses[child_idx].error != Coordination::Error::ZOK) - { - continue; - } - - ReplicatedPartitionExportInfo info; - const auto metadata_json = metadata_responses[child_idx].data; - const auto status = status_responses[child_idx].data; - const auto processing_parts = processing_responses[child_idx].names; - const auto parts_to_do = processing_parts.size(); - std::string exception_replica; - std::string last_exception; - std::string exception_part; - std::size_t exception_count = 0; - /// Process exception data - auto exception_data_it = exception_data_by_child.find(child_idx); - if (exception_data_it != exception_data_by_child.end()) - { - for (const auto & [replica, count_str, exception_str, part_str] : exception_data_it->second) - { - if (!count_str.empty()) - { - exception_count += parse(count_str); - } - if (last_exception.empty() && !exception_str.empty() && !part_str.empty()) - { - exception_replica = replica; - last_exception = exception_str; - exception_part = part_str; - } - } - } - - const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); - - info.destination_database = metadata.destination_database; - info.destination_table = metadata.destination_table; - info.partition_id = metadata.partition_id; - info.transaction_id = metadata.transaction_id; - info.query_id = metadata.query_id; - info.create_time = metadata.create_time; - info.source_replica = metadata.source_replica; - info.parts_count = metadata.number_of_parts; - info.parts_to_do = parts_to_do; - info.parts = metadata.parts; - info.status = status; - info.exception_replica = exception_replica; - info.last_exception = last_exception; - info.exception_part = exception_part; - info.exception_count = exception_count; - infos.emplace_back(std::move(info)); - } - - return infos; -} - -std::vector ExportPartitionManifestUpdatingTask::getPartitionExportsInfoLocal() const -{ + /// Strictly read from the in-memory mirror; no ZooKeeper traffic. The mirror is + /// kept up to date by poll() (periodic + parent-children watch) and by the existing + /// status-change handler. See the class header comment for the convergence guarantee. std::lock_guard lock(storage.export_merge_tree_partition_mutex); std::vector infos; + infos.reserve(storage.export_merge_tree_partition_task_entries_by_key.size()); for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_key) { @@ -573,6 +322,15 @@ std::vector ExportPartitionManifestUpdatingTask:: info.parts = entry.manifest.parts; info.status = magic_enum::enum_name(entry.status); + info.last_exception_per_replica.reserve(entry.last_exception_per_replica.size()); + size_t total_exception_count = 0; + for (const auto & [_, ex] : entry.last_exception_per_replica) + { + total_exception_count += ex.count; + info.last_exception_per_replica.push_back(ex); + } + info.exception_count = total_exception_count; + infos.emplace_back(std::move(info)); } @@ -625,6 +383,15 @@ void ExportPartitionManifestUpdatingTask::poll() const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); + /// Read last_exception leaves (no watch). Surfacing exceptions in the system table relies + /// on this read being part of every poll cycle: per-part failures during PENDING do not + /// trigger a status watch, so the only refresh path while the task is still in-flight is + /// the periodic poll. An empty result collapses every "nothing actionable" case + /// (transient ZK error, no children, all leaves ZNONODE/malformed) into a no-op so the + /// in-memory copy stays intact. + auto last_exception_per_replica = readLastExceptionPerReplica( + zk, fs::path(entry_path), key, storage.log.load()); + const auto local_entry = entries_by_key.find(key); /// If the zk entry has been replaced with export_merge_tree_partition_force_export, checking only for the export key is not enough @@ -632,9 +399,16 @@ void ExportPartitionManifestUpdatingTask::poll() bool has_local_entry_and_is_up_to_date = local_entry != entries_by_key.end() && local_entry->manifest.transaction_id == metadata.transaction_id; - /// If the entry is up to date and we don't have the cleanup lock, early exit, nothing to be done. + /// If the entry is up to date and we don't have the cleanup lock, refresh the in-memory + /// last_exception (surfaced by system.replicated_partition_exports) and early exit. + /// Direct mutation of the `mutable` field is safe under export_merge_tree_partition_mutex, + /// which is held throughout poll(). if (!cleanup_lock && has_local_entry_and_is_up_to_date) + { + if (!last_exception_per_replica.empty()) + local_entry->last_exception_per_replica = std::move(last_exception_per_replica); continue; + } std::weak_ptr weak_manifest_updater = storage.export_merge_tree_partition_manifest_updater; @@ -687,11 +461,15 @@ void ExportPartitionManifestUpdatingTask::poll() if (has_local_entry_and_is_up_to_date) { + /// Same refresh as the early-exit branch above; we also reach this point when + /// holding the cleanup lock (cleanup did not consume the entry). + if (!last_exception_per_replica.empty()) + local_entry->last_exception_per_replica = std::move(last_exception_per_replica); LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: already exists", key); continue; } - addTask(metadata, *status, key, entries_by_key); + addTask(metadata, *status, std::move(last_exception_per_replica), key, entries_by_key); } /// Remove entries that were deleted by someone else @@ -705,6 +483,7 @@ void ExportPartitionManifestUpdatingTask::poll() void ExportPartitionManifestUpdatingTask::addTask( const ExportReplicatedMergeTreePartitionManifest & metadata, ExportReplicatedMergeTreePartitionTaskEntry::Status status, + std::map last_exception_per_replica, const std::string & key, auto & entries_by_key ) @@ -714,7 +493,7 @@ void ExportPartitionManifestUpdatingTask::addTask( /// If the status is PENDING, we grab references to the data parts to prevent them from being deleted from the disk /// Otherwise, the operation has already been completed and there is no need to keep the data parts alive /// You might also ask: why bother adding tasks that have already been completed (i.e, status != PENDING)? - /// The reason is the `replicated_partition_exports` table in the local only mode might miss entries if they are not added here. + /// The reason is the `replicated_partition_exports` table might miss entries if they are not added here. if (status == ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) { for (const auto & part_name : metadata.parts) @@ -727,7 +506,7 @@ void ExportPartitionManifestUpdatingTask::addTask( } /// Insert or update entry. The multi_index container automatically maintains both indexes. - auto entry = ExportReplicatedMergeTreePartitionTaskEntry {metadata, status, std::move(part_references)}; + ExportReplicatedMergeTreePartitionTaskEntry entry {metadata, status, std::move(part_references), std::move(last_exception_per_replica)}; auto it = entries_by_key.find(key); if (it != entries_by_key.end()) entries_by_key.replace(it, entry); @@ -826,6 +605,19 @@ void ExportPartitionManifestUpdatingTask::handleStatusChanges() LOG_INFO(storage.log, "ExportPartition Manifest Updating task: status changed for task {}. New status: {}", key, magic_enum::enum_name(*new_status).data()); + /// Refresh last_exception leaves too. Status transitions to FAILED (via commit budget) + /// and KILLED (via timeout) atomically write a per-replica leaf in the same multi, so + /// reading them here ensures the system table surfaces the cause together with the + /// visible state change. No new watch is added — this piggybacks on the existing + /// status watch. An empty result means "nothing actionable" and leaves the previous + /// snapshot intact. + if (auto fetched = readLastExceptionPerReplica( + zk, fs::path(storage.zookeeper_path) / "exports" / key, key, storage.log.load()); + !fetched.empty()) + { + it->last_exception_per_replica = std::move(fetched); + } + /// If status changed to KILLED, cancel local export operations if (*new_status == ExportReplicatedMergeTreePartitionTaskEntry::Status::KILLED) { diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h index 855ecc334c09..32487f2dc68c 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h @@ -23,16 +23,17 @@ class ExportPartitionManifestUpdatingTask void addStatusChange(const std::string & key); + /// Returns a snapshot of every replicated partition export task tracked by this + /// replica's in-memory mirror. No ZooKeeper traffic; safe to call from query threads. std::vector getPartitionExportsInfo() const; - std::vector getPartitionExportsInfoLocal() const; - private: StorageReplicatedMergeTree & storage; void addTask( const ExportReplicatedMergeTreePartitionManifest & metadata, ExportReplicatedMergeTreePartitionTaskEntry::Status status, + std::map last_exception_per_replica, const std::string & key, auto & entries_by_key ); diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index 96cc648ffbb5..b2abcee8f0ce 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -331,10 +331,14 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess( /// Bump commit-attempts counter; transition to FAILED once the budget is exhausted. /// Prevents the task from remaining stuck in PENDING if commit() fails persistently /// (e.g. schema/spec mismatch, prolonged destination outage). + /// The exception is recorded in /last_exception via appendExceptionOps + /// inside the same multi as the commit_attempts bump and the (possible) FAILED set. const bool became_failed = ExportPartitionUtils::handleCommitFailure( zk, export_path, manifest.max_retries, + storage.replica_name, + e.message(), storage.log.load()); if (became_failed) diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index da11b5a11bbd..81df09c86523 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include "Storages/ExportReplicatedMergeTreePartitionManifest.h" #include "Storages/ExportReplicatedMergeTreePartitionTaskEntry.h" @@ -22,7 +23,6 @@ namespace ProfileEvents extern const Event ExportPartitionZooKeeperGetChildren; extern const Event ExportPartitionZooKeeperSet; extern const Event ExportPartitionZooKeeperMulti; - extern const Event ExportPartitionZooKeeperExists; } namespace DB @@ -217,6 +217,8 @@ namespace ExportPartitionUtils const zkutil::ZooKeeperPtr & zk, const std::string & entry_path, size_t max_attempts, + const std::string & replica_name, + const std::string & exception_message, const LoggerPtr & log) { const std::string status_path = fs::path(entry_path) / "status"; @@ -257,11 +259,15 @@ namespace ExportPartitionUtils Coordination::Requests ops; + /// Record the exception in the same multi as the commit-attempts bump and the + /// (possible) FAILED transition, so the user-visible last_exception znode is + /// updated atomically with the state change that exposes it. + appendExceptionOps(ops, zk, fs::path(entry_path), replica_name, /*part_name=*/"", exception_message, log); + /// Bump the global commit_attempts counter (shared across replicas). - /// Non-atomic get+set(-1), matching exceptions_per_replica/count semantics. - /// Under a race, two replicas may see the same value and write the same +1, - /// under-counting by one. FAILED then fires one retry later than the threshold, - /// which is acceptable (we always converge to FAILED, never "never"). + /// Non-atomic get+set(-1). Under a race, two replicas may see the same value + /// and write the same +1, under-counting by one. FAILED then fires one retry + /// later than the threshold, which is acceptable. const std::string commit_attempts_path = fs::path(entry_path) / "commit_attempts"; size_t attempts = 0; @@ -336,42 +342,42 @@ namespace ExportPartitionUtils const std::string & exception_message, const LoggerPtr & log) { - const auto exceptions_per_replica_path = entry_path / "exceptions_per_replica" / replica_name; - const auto count_path = exceptions_per_replica_path / "count"; - const auto last_exception_path = exceptions_per_replica_path / "last_exception"; + /// Per-replica leaf under the `last_exception/` container created at task setup. + /// Each replica only ever writes its own leaf, so cross-replica updates never + /// race on the count. Concurrent writers within the same replica still race + /// on read+1+write (best-effort), matching the documented column semantics. + const auto last_exception_path + = entry_path / "last_exception" / escapeForFileName(replica_name); + + LastExceptionEntry entry; + std::string current_data; ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperExists); - if (zk->exists(exceptions_per_replica_path)) + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); + const bool leaf_exists = zk->tryGet(last_exception_path, current_data); + if (leaf_exists) { - LOG_INFO(log, "ExportPartition: Exceptions per replica path exists, no need to create it"); - std::string num_exceptions_string; - - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); - if (zk->tryGet(count_path, num_exceptions_string)) + try { - const auto num_exceptions = parse(num_exceptions_string) + 1; - ops.emplace_back(zkutil::makeSetRequest(count_path, std::to_string(num_exceptions), -1)); + entry = LastExceptionEntry::fromJsonString(current_data); } - else + catch (...) { - /// TODO maybe we should find a better way to handle this case, not urgent - LOG_INFO(log, "ExportPartition: Failed to get number of exceptions, will not increment it"); + LOG_WARNING(log, "ExportPartition: last_exception JSON at {} is malformed, resetting", last_exception_path.string()); + entry = LastExceptionEntry{}; } - - ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "part", part_name, -1)); - ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "exception", exception_message, -1)); } + + entry.message = exception_message; + entry.part = part_name; + entry.replica = replica_name; + entry.time = ::time(nullptr); + entry.count += 1; + + if (leaf_exists) + ops.emplace_back(zkutil::makeSetRequest(last_exception_path, entry.toJsonString(), -1)); else - { - LOG_INFO(log, "ExportPartition: Exceptions per replica path does not exist, will create it"); - ops.emplace_back(zkutil::makeCreateRequest(exceptions_per_replica_path, "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(count_path, "1", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(last_exception_path, "", zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(last_exception_path / "part", part_name, zkutil::CreateMode::Persistent)); - ops.emplace_back(zkutil::makeCreateRequest(last_exception_path / "exception", exception_message, zkutil::CreateMode::Persistent)); - } + ops.emplace_back(zkutil::makeCreateRequest(last_exception_path, entry.toJsonString(), zkutil::CreateMode::Persistent)); } #if USE_AVRO diff --git a/src/Storages/MergeTree/ExportPartitionUtils.h b/src/Storages/MergeTree/ExportPartitionUtils.h index 411f3b5224be..d7bb83224755 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.h +++ b/src/Storages/MergeTree/ExportPartitionUtils.h @@ -47,31 +47,33 @@ namespace ExportPartitionUtils ); /// Handles a commit-phase failure for a replicated partition export: + /// - records the exception via appendExceptionOps in the same multi /// - increments /commit_attempts (lazy-created) /// - sets /status to FAILED once attempts >= max_attempts /// - /// The counter is a best-effort, non-atomic get+set(-1), matching - /// exceptions_per_replica/count. Concurrent failing commits may under-count by one - /// (FAILED may fire one retry later than the threshold), which is acceptable. - /// - /// `replica_name` and `exception` are currently unused and reserved for future - /// integration with per-replica diagnostics. + /// The counter is a best-effort, non-atomic get+set(-1). Concurrent failing + /// commits may under-count by one (FAILED may fire one retry later than the + /// threshold), which is acceptable. /// /// Returns true if this call transitioned the task to FAILED. bool handleCommitFailure( const zkutil::ZooKeeperPtr & zk, const std::string & entry_path, size_t max_attempts, + const std::string & replica_name, + const std::string & exception_message, const LoggerPtr & log); - /// Appends ZK ops to `ops` that record a per-replica exception under - /// /exceptions_per_replica//last_exception/{exception,part} - /// and increment /exceptions_per_replica//count, - /// creating the subtree if absent. + /// Appends a single ZK op to `ops` that writes the per-replica leaf + /// /last_exception/ + /// with a JSON-encoded LastExceptionEntry containing the message, part, + /// replica, time, and an incremented count. If the leaf does not yet exist + /// the op is a Create; otherwise it is a Set with version -1. /// - /// The count increment is non-atomic (synchronous tryGet + set with version -1). - /// Concurrent failing writers may under-count by one, which is accepted in this - /// subsystem and matches the pre-existing behaviour. + /// Cross-replica updates do not race: each replica only writes its own + /// leaf. Within a single replica the count increment is best-effort and + /// non-atomic (synchronous tryGet + Set with version -1); concurrent + /// failing writers may under-count by one, which is accepted. /// /// Intended to be combined with additional ops (for example a version-guarded /// status set) and executed as a single `tryMulti` so the exception record and diff --git a/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp b/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp index c9e3ffd9eef9..c4669a9bf55c 100644 --- a/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp +++ b/src/Storages/MergeTree/tests/gtest_export_partition_ordering.cpp @@ -43,9 +43,9 @@ TEST_F(ExportPartitionOrderingTest, IterationOrderMatchesCreateTime) manifest3.transaction_id = "tx3"; manifest3.create_time = base_time; // Oldest - ExportReplicatedMergeTreePartitionTaskEntry entry1{manifest1, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}}; - ExportReplicatedMergeTreePartitionTaskEntry entry2{manifest2, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}}; - ExportReplicatedMergeTreePartitionTaskEntry entry3{manifest3, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}}; + ExportReplicatedMergeTreePartitionTaskEntry entry1{manifest1, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}}; + ExportReplicatedMergeTreePartitionTaskEntry entry2{manifest2, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}}; + ExportReplicatedMergeTreePartitionTaskEntry entry3{manifest3, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, {}, {}}; // Insert in reverse order by_key.insert(entry1); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 38ffa27c97b1..40d720293b3a 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4515,14 +4515,9 @@ void StorageReplicatedMergeTree::exportMergeTreePartitionStatusHandlingTask() } } -std::vector StorageReplicatedMergeTree::getPartitionExportsInfo(bool prefer_remote_information) const +std::vector StorageReplicatedMergeTree::getPartitionExportsInfo() const { - if (prefer_remote_information && getZooKeeper()->isFeatureEnabled(DB::KeeperFeatureFlag::MULTI_READ)) - { - return export_merge_tree_partition_manifest_updater->getPartitionExportsInfo(); - } - - return export_merge_tree_partition_manifest_updater->getPartitionExportsInfoLocal(); + return export_merge_tree_partition_manifest_updater->getPartitionExportsInfo(); } StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::createLogEntryToMergeParts( @@ -8318,9 +8313,11 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & manifest.toJsonString(), zkutil::CreateMode::Persistent)); + /// Container for per-replica last_exception leaves; children are created lazily by the + /// first writer per replica (see ExportPartitionUtils::appendExceptionOps). ops.emplace_back(zkutil::makeCreateRequest( - fs::path(partition_exports_path) / "exceptions_per_replica", - "", + fs::path(partition_exports_path) / "last_exception", + "", zkutil::CreateMode::Persistent)); ops.emplace_back(zkutil::makeCreateRequest( diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 04a9e72e5513..afac526bcd58 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -378,7 +378,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData using ShutdownDeadline = std::chrono::time_point; void waitForUniquePartsToBeFetchedByOtherReplicas(ShutdownDeadline shutdown_deadline); - std::vector getPartitionExportsInfo(bool prefer_remote_information) const; + std::vector getPartitionExportsInfo() const; private: std::atomic_bool are_restoring_replica {false}; diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp index e088e4f77214..9e8faab689d6 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.cpp @@ -6,23 +6,28 @@ #include #include #include +#include #include #include #include "Columns/ColumnString.h" #include "Storages/VirtualColumnUtils.h" -#include namespace DB { -namespace Setting -{ - extern const SettingsBool export_merge_tree_partition_system_table_prefer_remote_information; -} - ColumnsDescription StorageSystemReplicatedPartitionExports::getColumnsDescription() { + auto last_exception_tuple = std::make_shared( + DataTypes{ + std::make_shared(), + std::make_shared(), + std::make_shared(), + std::make_shared(), + std::make_shared(), + }, + Names{"replica", "message", "part", "time", "count"}); + return ColumnsDescription { {"source_database", std::make_shared(), "Name of the source database."}, @@ -38,10 +43,10 @@ ColumnsDescription StorageSystemReplicatedPartitionExports::getColumnsDescriptio {"parts_count", std::make_shared(), "Number of parts in the export."}, {"parts_to_do", std::make_shared(), "Number of parts pending to be exported."}, {"status", std::make_shared(), "Status of the export."}, - {"exception_replica", std::make_shared(), "Replica that caused the last exception"}, - {"last_exception", std::make_shared(), "Last exception message of any part (not necessarily the last global exception)"}, - {"exception_part", std::make_shared(), "Part that caused the last exception"}, - {"exception_count", std::make_shared(), "Number of global exceptions"}, + {"last_exception_per_replica", std::make_shared(last_exception_tuple), + "Per-replica last exception entries. Each tuple records the most recent exception observed by that replica plus a best-effort within-replica count. Empty array if no replica has reported an exception for this task."}, + {"exception_count", std::make_shared(), + "Sum of per-replica exception counts. Each replica owns its own count, so the sum is exact w.r.t. the in-memory snapshot; within-replica updates remain best-effort and may under-count by one under concurrent failures."}, }; } @@ -117,7 +122,7 @@ void StorageSystemReplicatedPartitionExports::fillData(MutableColumns & res_colu { const IStorage * storage = replicated_merge_tree_tables[database][table].get(); if (const auto * replicated_merge_tree = dynamic_cast(storage)) - partition_exports_info = replicated_merge_tree->getPartitionExportsInfo(context->getSettingsRef()[Setting::export_merge_tree_partition_system_table_prefer_remote_information]); + partition_exports_info = replicated_merge_tree->getPartitionExportsInfo(); } for (const ReplicatedPartitionExportInfo & info : partition_exports_info) @@ -135,14 +140,17 @@ void StorageSystemReplicatedPartitionExports::fillData(MutableColumns & res_colu Array parts_array; parts_array.reserve(info.parts.size()); for (const auto & part : info.parts) - parts_array.push_back(part); + parts_array.push_back(part); res_columns[i++]->insert(parts_array); res_columns[i++]->insert(info.parts_count); res_columns[i++]->insert(info.parts_to_do); res_columns[i++]->insert(info.status); - res_columns[i++]->insert(info.exception_replica); - res_columns[i++]->insert(info.last_exception); - res_columns[i++]->insert(info.exception_part); + + Array per_replica; + per_replica.reserve(info.last_exception_per_replica.size()); + for (const auto & ex : info.last_exception_per_replica) + per_replica.push_back(Tuple{ex.replica, ex.message, ex.part, ex.time, ex.count}); + res_columns[i++]->insert(per_replica); res_columns[i++]->insert(info.exception_count); } } diff --git a/src/Storages/System/StorageSystemReplicatedPartitionExports.h b/src/Storages/System/StorageSystemReplicatedPartitionExports.h index 15eb54f38c0e..a8666374a7f0 100644 --- a/src/Storages/System/StorageSystemReplicatedPartitionExports.h +++ b/src/Storages/System/StorageSystemReplicatedPartitionExports.h @@ -1,5 +1,6 @@ #pragma once +#include #include namespace DB @@ -20,9 +21,13 @@ struct ReplicatedPartitionExportInfo size_t parts_to_do; std::vector parts; String status; - std::string exception_replica; - std::string last_exception; - std::string exception_part; + /// One entry per replica that has recorded at least one exception for this task. + /// Sourced verbatim from the in-memory mirror; no ZooKeeper traffic. + std::vector last_exception_per_replica; + /// Sum of per-replica counts. Each replica owns its own count, so cross-replica + /// updates do not race; the sum is exact w.r.t. the in-memory snapshot. Within a + /// single replica the count is best-effort (concurrent failing writers may under- + /// count by one), matching the documented column semantics. size_t exception_count = 0; }; diff --git a/tests/integration/helpers/export_partition_helpers.py b/tests/integration/helpers/export_partition_helpers.py index d6bf78df0998..46e73e8a04e6 100644 --- a/tests/integration/helpers/export_partition_helpers.py +++ b/tests/integration/helpers/export_partition_helpers.py @@ -86,10 +86,20 @@ def wait_for_exception_count( dest_table, partition_id, min_exception_count=1, - timeout=30, + timeout=60, poll_interval=0.5, ): - """Wait for exception_count to reach at least *min_exception_count*.""" + """Wait for exception_count to reach at least *min_exception_count*. + + The default timeout is intentionally larger than one manifest-updater poll + cycle (~30s, see StorageReplicatedMergeTree::exportMergeTreePartitionUpdatingTask). + system.replicated_partition_exports is served from the in-memory mirror, which + is refreshed on (a) the periodic poll tick and (b) status changes. While the + task is still PENDING (e.g. transient part-export failures with a generous + max_retries), no status watch fires, so newly written per-replica exception + leaves only become visible on the next poll. Allow at least one full cycle + plus headroom so the test is not racing the cadence. + """ start_time = time.time() last_exception_count = None while time.time() - start_time < timeout: @@ -98,7 +108,6 @@ def wait_for_exception_count( f" WHERE source_table = '{source_table}'" f" AND destination_table = '{dest_table}'" f" AND partition_id = '{partition_id}'" - f" SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = 1" ).strip() if exception_count_str: diff --git a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py index af6ca75bafd7..58e019e407a6 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_iceberg/test.py @@ -242,7 +242,6 @@ def test_failure_is_logged_in_system_table(cluster): WHERE source_table = '{mt_table}' AND destination_table = '{iceberg_table}' AND partition_id = '2020' - SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = 1 """ ).strip() assert status == "FAILED", f"Expected FAILED status, got: {status!r}" @@ -253,7 +252,6 @@ def test_failure_is_logged_in_system_table(cluster): WHERE source_table = '{mt_table}' AND destination_table = '{iceberg_table}' AND partition_id = '2020' - SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = 1 """ ).strip()) assert exception_count > 0, "Expected non-zero exception_count in system.replicated_partition_exports" @@ -321,7 +319,6 @@ def test_inject_short_living_failures(cluster): WHERE source_table = '{mt_table}' AND destination_table = '{iceberg_table}' AND partition_id = '2020' - SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = 1 """ ).strip()) assert exception_count >= 1, "Expected at least one transient exception to be recorded" @@ -857,23 +854,31 @@ def test_export_task_timeout_kills_stuck_pending_task(cluster): timeout=90, ) - # TODO: system.replicated_partition_exports does not currently surface - # last_exception / exception_count reliably (the engine's aggregation - # from exceptions_per_replica is incomplete). Read the raw znode via - # system.zookeeper until that is fixed. - export_key = f"2020_default.{iceberg_table}" - last_exception_path = ( - f"/clickhouse/tables/{mt_table}/exports/{export_key}" - f"/exceptions_per_replica/replica1/last_exception" - ) - last_exception = node.query( - f""" - SELECT value FROM system.zookeeper - WHERE path = '{last_exception_path}' AND name = 'exception' - """ - ).strip() + # The KILL transition writes a per-replica last_exception leaf in the same + # ZK multi as the status flip; handleStatusChanges then mirrors it into + # memory together with the status. Poll briefly to allow that watch -> + # mirror hop. We use arrayJoin to flatten the per-replica array column; + # any replica reporting the timeout reason is sufficient. + deadline = time.time() + 30 + last_exception = "" + while time.time() < deadline: + last_exception = node.query( + f""" + SELECT arrayStringConcat( + arrayMap(x -> x.message, last_exception_per_replica), + '\\n' + ) + FROM system.replicated_partition_exports + WHERE source_table = '{mt_table}' + AND destination_table = '{iceberg_table}' + AND partition_id = '2020' + """ + ).strip() + if "timed out" in last_exception: + break + time.sleep(0.5) assert "timed out" in last_exception, ( - f"Expected last_exception znode to mention the timeout reason, got: {last_exception!r}" + f"Expected last_exception_per_replica column to mention the timeout reason, got: {last_exception!r}" ) finally: node.query("SYSTEM DISABLE FAILPOINT export_partition_commit_always_throw") diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index 82efc6765990..97703065471f 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -228,18 +228,15 @@ def test_restart_nodes_during_export(cluster): assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2021") != f'0\n', "Export of partition 2021 did not resume after crash" -@pytest.mark.parametrize( - "system_table_prefer_remote_information", ['0', '1'] -) -def test_kill_export(cluster, system_table_prefer_remote_information): +def test_kill_export(cluster): skip_if_remote_database_disk_enabled(cluster) node = cluster.instances["replica1"] node2 = cluster.instances["replica2"] watcher_node = cluster.instances["watcher_node"] postfix = str(uuid.uuid4()).replace("-", "_") - mt_table = f"kill_export_mt_table_{system_table_prefer_remote_information}_{postfix}" - s3_table = f"kill_export_s3_table_{system_table_prefer_remote_information}_{postfix}" + mt_table = f"kill_export_mt_table_{postfix}" + s3_table = f"kill_export_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") create_tables_and_insert_data(node2, mt_table, s3_table, "replica2") @@ -316,8 +313,8 @@ def test_kill_export(cluster, system_table_prefer_remote_information): assert node.query(f"SELECT count() FROM s3(s3_conn, filename='{s3_table}/commit_2021_*', format=LineAsString)") != f'0\n', "Partition 2021 was not written to S3, but it should have been" # check system.replicated_partition_exports for the export, status should be KILLED - assert node.query(f"SELECT status FROM system.replicated_partition_exports WHERE partition_id = '2020' and source_table = '{mt_table}' and destination_table = '{s3_table}' SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = {system_table_prefer_remote_information}") == 'KILLED\n', "Partition 2020 was not killed as expected" - assert node.query(f"SELECT status FROM system.replicated_partition_exports WHERE partition_id = '2021' and source_table = '{mt_table}' and destination_table = '{s3_table}' SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = {system_table_prefer_remote_information}") == 'COMPLETED\n', "Partition 2021 was not completed, this is unexpected" + assert node.query(f"SELECT status FROM system.replicated_partition_exports WHERE partition_id = '2020' and source_table = '{mt_table}' and destination_table = '{s3_table}'") == 'KILLED\n', "Partition 2020 was not killed as expected" + assert node.query(f"SELECT status FROM system.replicated_partition_exports WHERE partition_id = '2021' and source_table = '{mt_table}' and destination_table = '{s3_table}'") == 'COMPLETED\n', "Partition 2021 was not completed, this is unexpected" # check the data did not land on s3 assert node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020") == '0\n', "Partition 2020 was written to S3, it was not killed as expected" @@ -376,14 +373,12 @@ def test_kill_export_resilient_to_status_handling_failure(cluster): # Wait up to 15 s (5 s retry delay + margin) for the kill to propagate. wait_for_export_status(node, mt_table, s3_table, "2020", "KILLED", timeout=15) - # query the local status export_merge_tree_partition_system_table_prefer_remote_information=0 assert ( node.query( f"SELECT status FROM system.replicated_partition_exports" f" WHERE partition_id = '2020'" f" AND source_table = '{mt_table}'" f" AND destination_table = '{s3_table}'" - f" SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = 0" ).strip() == "KILLED" ), "Export was not killed — status change was lost after the injected failure" @@ -537,7 +532,6 @@ def test_failure_is_logged_in_system_table(cluster): WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}' AND partition_id = '2020' - SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = 1 """ ) @@ -549,7 +543,6 @@ def test_failure_is_logged_in_system_table(cluster): WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}' AND partition_id = '2020' - SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = 1 """ ) assert int(exception_count.strip()) > 0, "Expected non-zero exception_count in system.replicated_partition_exports" @@ -600,8 +593,11 @@ def test_inject_short_living_failures(cluster): f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} SETTINGS export_merge_tree_partition_max_retries=100;" ) - # wait for at least one exception to occur, but not enough to finish the export - wait_for_exception_count(node, mt_table, s3_table, "2020", min_exception_count=1, timeout=30) + # wait for at least one exception to occur, but not enough to finish the export. + # Use the helper default (>= one manifest-updater poll cycle): system.replicated_partition_exports + # is served from the in-memory mirror, and while the task stays PENDING the mirror only + # picks up new exception leaves on the next poll tick (~30s) — see helper docstring. + wait_for_exception_count(node, mt_table, s3_table, "2020", min_exception_count=1) # wait for the export to finish wait_for_export_status(node, mt_table, s3_table, "2020", "COMPLETED") @@ -626,7 +622,6 @@ def test_inject_short_living_failures(cluster): WHERE source_table = '{mt_table}' AND destination_table = '{s3_table}' AND partition_id = '2020' - SETTINGS export_merge_tree_partition_system_table_prefer_remote_information = 1 """ ) assert int(exception_count.strip()) >= 1, "Expected at least one exception"