Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions antalya/docs/design/alter-table-export-part-partition.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,12 +422,13 @@ The following notes expand on expected behavior of commands.
every active part of partition `p` across all replicas that host
it; `system.replicated_partition_exports` converges to `COMPLETED`.

4. Re-issuing the same `EXPORT PARTITION` within
`export_merge_tree_partition_manifest_ttl` is a no-op (no
duplicate files) unless `export_merge_tree_partition_force_export = 1`. This
behavior avoids accidentally exporting the same data twice. Note, however
that forcing the operation is dangerous if ClickHouse can't clean up the
previous operation. In this case you'll potentially commit files twice.
4. Re-issuing the same `EXPORT PARTITION` is rejected (no duplicate
files) unless `export_merge_tree_partition_force_export = 1`. Export
entries are kept indefinitely in `system.replicated_partition_exports`
as history; they never expire. This behavior avoids accidentally
exporting the same data twice. Note, however that forcing the
operation is dangerous if ClickHouse can't clean up the previous
operation. In this case you'll potentially commit files twice.

5. Killing an in-flight partition export via `KILL EXPORT PARTITION`
transitions status to `KILLED` and stops all replicas' contributions.
Expand Down Expand Up @@ -481,7 +482,6 @@ The following notes expand on expected behavior of commands.
| `export_merge_tree_part_filename_pattern` | query | `{part_name}_{checksum}` | `String` | both | Filename template; supports `{part_name}`, `{checksum}`, `{database}`, `{table}`, server macros. |
| `export_merge_tree_partition_force_export` | query | `false` | `Bool` | `EXPORT PARTITION` | Overwrite a live Keeper manifest for the same `(source, destination, partition_id)`. Dangerous — can produce duplicate data on the destination; use with caution. |
| `export_merge_tree_partition_max_retries` | query | `3` | `UInt64` | `EXPORT PARTITION` | Retry budget applied to both per-part export attempts and per-task commit attempts (Iceberg). The task fails terminally if commit retries alone exceed the budget. |
| `export_merge_tree_partition_manifest_ttl` | query | `180` (seconds) | `UInt64` | `EXPORT PARTITION` | Live-manifest TTL; acts as the idempotency window. Does not interrupt in-flight tasks. Keep this greater than `export_merge_tree_partition_task_timeout_seconds` if you want the `KILLED` entry to remain visible in `system.replicated_partition_exports` after the timeout fires. |
| `export_merge_tree_partition_task_timeout_seconds` | query | `3600` (seconds) | `UInt64` (`0`=disable) | `EXPORT PARTITION` | Wall-clock cap for `PENDING` tasks; on expiry transitions to `KILLED` with a timeout reason. Measured from manifest `create_time`. Enforcement latency ≈ one manifest-updater poll cycle (~30s) plus Keeper watch propagation. |
| `export_merge_tree_partition_system_table_prefer_remote_information` | query | `false` | `Bool` | `EXPORT PARTITION` | When `true`, `system.replicated_partition_exports` fetches fresh state from Keeper (requires the `MULTI_READ` feature flag); when `false`, uses local cached state. **Default flipped from `true` to `false` in this release** — Keeper round-trips were more expensive than warranted for the typical observability workload. (See NOTE 2.)|
| `export_merge_tree_part_file_already_exists_policy` | query | `skip` | `skip` / `error` / `overwrite` | `EXPORT PARTITION` | Per-file policy during partition export. |
Expand Down Expand Up @@ -722,7 +722,8 @@ peak memory. Hot path is the Parquet encoder, which warrants a guard against reg
(`parts_to_do > 0`) rather than corrupt data. Acceptable but must be documented in the
upgrade notes.
- **Risk (object-storage cost / accidental large exports):** mitigated by the experimental
gates (default off) and the manifest idempotency window.
gates (default off) and the duplicate-export rejection (an existing export key is refused
unless `export_merge_tree_partition_force_export` is set).
- **Risk (Iceberg catalog manifest retention):** if the catalog reaps old manifest files
with a retention window shorter than `export_merge_tree_partition_task_timeout_seconds`,
the rare "sole-node commits, crashes, recovers after reaper deleted the commit manifest"
Expand Down
9 changes: 1 addition & 8 deletions docs/en/antalya/partition_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,14 @@ TO TABLE [destination_database.]destination_table

- **Type**: `Bool`
- **Default**: `false`
- **Description**: Ignore existing partition export and overwrite the ZooKeeper entry. Allows re-exporting a partition to the same destination before the manifest expires. **IMPORTANT:** this is dangerous because it can lead to duplicated data, use it with caution.
- **Description**: Ignore existing partition export and overwrite the ZooKeeper entry. Allows re-exporting a partition that was already exported to the same destination. **IMPORTANT:** this is dangerous because it can lead to duplicated data, use it with caution.

#### `export_merge_tree_partition_max_retries` (Optional)

- **Type**: `UInt64`
- **Default**: `3`
- **Description**: Maximum number of retries for exporting a merge tree part in an export partition task. If it exceeds, the entire task fails.

#### `export_merge_tree_partition_manifest_ttl` (Optional)

- **Type**: `UInt64`
- **Default**: `180` (seconds)
- **Description**: Determines how long the manifest will live in ZooKeeper. It prevents the same partition from being exported twice to the same destination. This setting does not affect or delete in-progress tasks; it only cleans up completed ones.

#### `export_merge_tree_part_file_already_exists_policy` (Optional)

- **Type**: `MergeTreePartExportFileAlreadyExistsPolicy`
Expand Down Expand Up @@ -109,7 +103,6 @@ When the timeout is exceeded the task transitions to KILLED (same terminal state

Notes:
- Enforcement is best-effort: actual kill latency is bounded by one manifest-updater poll cycle (~30s) plus ZooKeeper watch propagation.
- Since both this timeout and `export_merge_tree_partition_manifest_ttl` are measured from `create_time`, keep `export_merge_tree_partition_manifest_ttl` greater than `export_merge_tree_partition_task_timeout_seconds` if you want the KILLED entry to remain visible in `system.replicated_partition_exports` after the timeout fires.

## Examples

Expand Down
6 changes: 1 addition & 5 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7557,10 +7557,6 @@ Ignore existing partition export and overwrite the zookeeper entry
)", 0) \
DECLARE(UInt64, export_merge_tree_partition_max_retries, 3, R"(
Maximum number of retries for exporting a merge tree part in an export partition task
)", 0) \
DECLARE(UInt64, export_merge_tree_partition_manifest_ttl, 86400, R"(
Determines how long the manifest will live in ZooKeeper. It prevents the same partition from being exported twice to the same destination.
This setting does not affect / delete in progress tasks. It'll only cleanup the completed ones.
)", 0) \
DECLARE(UInt64, export_merge_tree_partition_task_timeout_seconds, 86400, R"(
Maximum wall-clock duration (in seconds) an export partition task is allowed to remain in the PENDING state before it is auto-killed by the background cleanup loop.
Expand All @@ -7573,7 +7569,6 @@ In such scenario, ClickHouse would attempt to commit those files again producing

Notes:
- Enforcement is best-effort: actual kill latency is bounded by one manifest-updater poll cycle (~30s) plus ZooKeeper watch propagation.
- Since both this timeout and `export_merge_tree_partition_manifest_ttl` are measured from `create_time`, keep `export_merge_tree_partition_manifest_ttl` greater than `export_merge_tree_partition_task_timeout_seconds` if you want the KILLED entry to remain visible in `system.replicated_partition_exports` after the timeout fires.
)", 0) \
DECLARE(MergeTreePartExportFileAlreadyExistsPolicy, export_merge_tree_part_file_already_exists_policy, MergeTreePartExportFileAlreadyExistsPolicy::skip, R"(
Possible values:
Expand Down Expand Up @@ -7943,6 +7938,7 @@ Maximum number of WebAssembly UDF instances that can run in parallel per functio

#define OBSOLETE_SETTINGS(M, ALIAS) \
/** Obsolete settings which are kept around for compatibility reasons. They have no effect anymore. */ \
MAKE_OBSOLETE(M, UInt64, export_merge_tree_partition_manifest_ttl, 86400) \
MAKE_OBSOLETE(M, Bool, query_condition_cache_store_conditions_as_plaintext, false) \
MAKE_OBSOLETE(M, Bool, update_insert_deduplication_token_in_dependent_materialized_views, 0) \
MAKE_OBSOLETE(M, UInt64, max_memory_usage_for_all_queries, 0) \
Expand Down
3 changes: 0 additions & 3 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ struct ExportReplicatedMergeTreePartitionManifest
std::vector<String> parts;
time_t create_time;
size_t max_retries;
size_t ttl_seconds;
size_t task_timeout_seconds;
size_t max_threads;
bool parallel_formatting;
Expand Down Expand Up @@ -205,7 +204,6 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("filename_pattern", filename_pattern);
json.set("create_time", create_time);
json.set("max_retries", max_retries);
json.set("ttl_seconds", ttl_seconds);
json.set("task_timeout_seconds", task_timeout_seconds);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve ttl_seconds in exported manifests

During a rolling upgrade, a newly upgraded replica will now create /exports/.../metadata.json without ttl_seconds, but any still-running pre-change replica deserializes every export manifest with json->getValue<size_t>("ttl_seconds") in ExportReplicatedMergeTreePartitionManifest::fromJsonString. That makes those replicas throw an exception while polling/scheduling/displaying/killing the export, so mixed-version clusters can leave newly created exports unprocessed or invisible until every replica is upgraded. Please keep writing a dummy ttl_seconds field for the compatibility window while ignoring it in the new code.

Useful? React with 👍 / 👎.

json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
Expand Down Expand Up @@ -240,7 +238,6 @@ struct ExportReplicatedMergeTreePartitionManifest
manifest.parts.push_back(parts_array->getElement<String>(static_cast<unsigned int>(i)));

manifest.create_time = json->getValue<time_t>("create_time");
manifest.ttl_seconds = json->getValue<size_t>("ttl_seconds");
manifest.task_timeout_seconds = json->getValue<size_t>("task_timeout_seconds");
manifest.max_threads = json->getValue<size_t>("max_threads");
manifest.parallel_formatting = json->getValue<bool>("parallel_formatting");
Expand Down
72 changes: 22 additions & 50 deletions src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,54 +128,35 @@ namespace
}

/*
Remove expired entries and fix non-committed exports that have already exported all parts.

Return values:
- true: the cleanup was successful, the entry is removed from the entries_by_key container and the function returns true. Proceed to the next entry.
- false: the cleanup was not successful, the entry is not removed from the entries_by_key container and the function returns false.
Enforce the PENDING task timeout and recover non-committed exports that have already
exported all parts. Entries are never removed for age — `system.replicated_partition_exports`
is append-only history, so the entry always stays in the in-memory container: a KILLED
transition is driven by the status watch, and a deferred commit is handled by the caller
after the lock is released.

Side outputs:
- `deferred_commits`: when a PENDING entry has all parts processed but the export was
never committed, this function appends a CommitRecoveryWork item to be executed by
the caller after releasing the storage-wide mutex. The actual commit() call (which
performs network I/O to the destination catalog and S3) MUST NOT run under the lock.
The function still returns `false` in that case so the outer poll() loop falls through
to `addTask`, keeping the in-memory entry consistent regardless of whether the
deferred commit ultimately succeeds.
*/
bool tryCleanup(
void tryCleanup(
const zkutil::ZooKeeperPtr & zk,
const std::string & entry_path,
const LoggerPtr & log,
const ContextPtr & storage_context,
StorageReplicatedMergeTree & storage,
const std::string & key,
const ExportReplicatedMergeTreePartitionManifest & metadata,
const time_t now,
const bool is_pending,
auto & entries_by_key,
std::vector<CommitRecoveryWork> & deferred_commits
)
{
bool has_expired = metadata.create_time < now - static_cast<time_t>(metadata.ttl_seconds);

bool task_timed_out = is_pending
&& metadata.task_timeout_seconds > 0
&& metadata.create_time + static_cast<time_t>(metadata.task_timeout_seconds) < now;

if (has_expired && !is_pending)
{
zk->tryRemoveRecursive(fs::path(entry_path));
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemoveRecursive);
auto it = entries_by_key.find(key);
if (it != entries_by_key.end())
entries_by_key.erase(it);
LOG_INFO(log, "ExportPartition Manifest Updating Task: Removed {}: expired", key);

return true;
}
else if (task_timed_out)
if (task_timed_out)
{
const std::string status_path = fs::path(entry_path) / "status";

Expand All @@ -187,14 +168,14 @@ namespace
if (!zk->tryGet(status_path, status_string, &status_stat))
{
LOG_INFO(log, "ExportPartition Manifest Updating Task: Failed to read status for {} while enforcing task timeout, skipping", entry_path);
return false;
return;
}

const auto current_status = magic_enum::enum_cast<ExportReplicatedMergeTreePartitionTaskEntry::Status>(status_string);
if (!current_status || *current_status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING)
{
LOG_INFO(log, "ExportPartition Manifest Updating Task: Task {} is not PENDING, can't set to KILLED, skipping", entry_path);
return false;
return;
}

const auto timeout_message = fmt::format(
Expand Down Expand Up @@ -234,9 +215,9 @@ namespace
entry_path, rc);
}

/// Return false so the entry remains in entries_by_key; the status watch will drive
/// The entry remains in entries_by_key; the status watch will drive
/// handleStatusChanges -> killExportPart on every replica, mirroring user-initiated KILL.
return false;
return;
}
else if (is_pending)
{
Expand All @@ -249,7 +230,7 @@ namespace
{

LOG_INFO(log, "ExportPartition Manifest Updating Task: Failed to get parts in processing or pending, skipping");
return false;
return;
}

if (parts_in_processing_or_pending.empty())
Expand All @@ -261,7 +242,7 @@ namespace
if (!destination_storage)
{
LOG_INFO(log, "ExportPartition Manifest Updating Task: Failed to reconstruct destination storage: {}, skipping", destination_storage_id.getNameForLogs());
return false;
return;
}

/// A replica exported the last part but the commit never landed. Capture everything
Expand All @@ -270,22 +251,18 @@ namespace
/// MAX_TRANSACTION_RETRIES = 100 retries; holding the storage-wide mutex across
/// that work is what caused `system.replicated_partition_exports` to hang.
///
/// Returning false here keeps the outer poll() loop on the normal path: it will
/// call addTask() so the in-memory container reflects the PENDING entry. The
/// status watch registered by poll() will transition the local entry to
/// COMPLETED/FAILED once the deferred commit (or a peer's commit) updates
/// /status in ZooKeeper.
/// The outer poll() loop stays on the normal path: it will call addTask() so the
/// in-memory container reflects the PENDING entry. The status watch registered by
/// poll() will transition the local entry to COMPLETED/FAILED once the deferred
/// commit (or a peer's commit) updates /status in ZooKeeper.
deferred_commits.push_back(CommitRecoveryWork{
.metadata = metadata,
.entry_path = entry_path,
.destination_storage = destination_storage,
.context = context,
});
return false;
}
}

return false;
}
}

Expand Down Expand Up @@ -365,8 +342,8 @@ void ExportPartitionManifestUpdatingTask::poll()
const std::string cleanup_lock_path = fs::path(storage.zookeeper_path) / "exports_cleanup_lock";

/// The `exports_cleanup_lock` is an ephemeral ZK node that serializes cleanup work
/// across replicas: only the replica holding it walks `tryCleanup` (entry expiry +
/// commit recovery). It MUST outlive the deferred-commit loop below; otherwise a peer
/// across replicas: only the replica holding it walks `tryCleanup` (task-timeout
/// enforcement + commit recovery). It MUST outlive the deferred-commit loop below; otherwise a peer
/// replica's next poll() could acquire it and race us on the same commit-recovery work,
/// duplicating REST-catalog round-trips and snapshot writes. The EphemeralNodeHolder
/// destructor removes the node, so we declare it at function scope and let it die
Expand Down Expand Up @@ -468,25 +445,20 @@ void ExportPartitionManifestUpdatingTask::poll()
continue;
}

/// if we have the cleanup lock, try to cleanup
/// if we successfully cleaned it up, early exit
/// If we hold the cleanup lock, enforce the task timeout and recover uncommitted exports.
/// Entries are never removed here, so we always fall through to refresh / addTask below.
if (cleanup_lock)
{
bool cleanup_successful = tryCleanup(
tryCleanup(
zk,
entry_path,
storage.log.load(),
storage.getContext(),
storage,
key,
metadata,
now,
*status == ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING,
entries_by_key,
deferred_commits);

if (cleanup_successful)
continue;
}

if (has_local_entry_and_is_up_to_date)
Expand Down
Loading
Loading