Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions docs/en/antalya/partition_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The `ALTER TABLE EXPORT PARTITION` command exports entire partitions from Replic

The set of parts that are exported is based on the list of parts the replica that received the export command sees. The other replicas will assist in the export process if they have those parts locally. Otherwise they will ignore it.

The partition export tasks can be observed through `system.replicated_partition_exports`. Querying this table results in a query to ZooKeeper, so it must be used with care. Individual part export progress can be observed as usual through `system.exports`.
The partition export tasks can be observed through `system.replicated_partition_exports`. The table is served from each replica's in-memory mirror, so queries do not contact ZooKeeper and are cheap to run. The mirror is refreshed on the manifest-updater poll cycle and on every status change, so a freshly written exception or terminal state may take up to one poll interval to appear. Individual part export progress can be observed as usual through `system.exports`.

The same partition can not be exported to the same destination more than once. There are two ways to override this behavior: either by setting the `export_merge_tree_partition_force_export` setting or waiting for the task to expire.

Expand Down Expand Up @@ -105,7 +105,7 @@ TO TABLE [destination_database.]destination_table
- **Type**: `UInt64`
- **Default**: `3600`
- **Description**: The timeout is measured from the manifest's create_time. Set to 0 to disable the timeout.
When the timeout is exceeded the task transitions to KILLED (same terminal state as `KILL QUERY ... EXPORT PARTITION`), and `last_exception` is populated with a timeout reason.
When the timeout is exceeded the task transitions to KILLED (same terminal state as `KILL QUERY ... EXPORT PARTITION`), and a `last_exception_per_replica` entry on the replica that fires the timeout is populated with a timeout reason.

Notes:
- Enforcement is best-effort: actual kill latency is bounded by one manifest-updater poll cycle (~30s) plus ZooKeeper watch propagation.
Expand Down Expand Up @@ -170,9 +170,7 @@ parts: ['2022_0_0_0','2022_1_1_0','2022_2_2_0']
parts_count: 3
parts_to_do: 0
status: COMPLETED
exception_replica:
last_exception:
exception_part:
last_exception_per_replica: []
exception_count: 0

Row 2:
Expand All @@ -189,9 +187,7 @@ parts: ['2021_0_0_0']
parts_count: 1
parts_to_do: 0
status: COMPLETED
exception_replica:
last_exception:
exception_part:
last_exception_per_replica: []
exception_count: 0

2 rows in set. Elapsed: 0.019 sec.
Expand All @@ -205,6 +201,20 @@ Status values include:
- `FAILED` - Export failed
- `KILLED` - Export was cancelled

### Exception columns

- `last_exception_per_replica` is an `Array(Tuple(replica String, message String, part String, time DateTime, count UInt64))`. Each tuple is the most recent exception observed by a single replica plus a best-effort within-replica `count`. Replicas that have never reported an exception are omitted.
- `exception_count` is the sum of every `count` in `last_exception_per_replica`. Each replica owns its own counter, so cross-replica updates do not race; the sum is exact w.r.t. the snapshot returned. Within a single replica concurrent failing writers may under-count by one.

To pick the latest exception across replicas:

```sql
SELECT
arraySort(x -> -x.time, last_exception_per_replica)[1] AS latest_exception
FROM system.replicated_partition_exports
WHERE source_table = 'rmt_table' AND destination_table = 's3_table';
```

## Related Features

- [ALTER TABLE EXPORT PART](/docs/en/engines/table-engines/mergetree-family/part_export.md) - Export individual parts (non-replicated)
Expand Down
4 changes: 0 additions & 4 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7404,10 +7404,6 @@ Throw an error if there are pending patch parts when exporting a merge tree part
DECLARE(Bool, export_merge_tree_partition_lock_inside_the_task, false, R"(
Only lock a part when the task is already running. This might help with busy waiting where the scheduler locks a part, but the task ends in the pending list.
On the other hand, there is a chance once the task executes that part has already been locked by another replica and the task will simply early exit.
)", 0) \
DECLARE(Bool, export_merge_tree_partition_system_table_prefer_remote_information, false, R"(
Controls whether the system.replicated_partition_exports will prefer to query ZooKeeper to get the most up to date information or use the local information.
Querying ZooKeeper is expensive, and only available if the ZooKeeper feature flag MULTI_READ is enabled.
)", 0) \
DECLARE(Timezone, iceberg_partition_timezone, "", R"(
Time zone by which partitioning of Iceberg tables was performed.
Expand Down
1 change: 0 additions & 1 deletion src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."},
{"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."},
{"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."},
{"export_merge_tree_partition_system_table_prefer_remote_information", true, true, "New setting."},
{"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."},
{"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."},
{"object_storage_cluster", "", "", "Antalya: New setting"},
Expand Down
54 changes: 54 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,60 @@ struct ExportReplicatedMergeTreePartitionProcessingPartEntry
}
};

/// Per-task "last exception" record persisted at <export-entry>/last_exception.
///
/// Single znode per export task. Updated atomically with the surrounding state
/// transition (status flip / lock release / retry counter bump) via a single
/// `tryMulti` Set op. The `count` field is best-effort and non-atomic: writers
/// `tryGet` the current value and write `count + 1` back without a version
/// check, so concurrent writers may under-count. This matches the semantics
/// used by `commit_attempts` and is documented in the system table.
struct LastExceptionEntry
{
String message;
String part; /// empty for task-level exceptions (commit failure, timeout)
String replica;
time_t time = 0;
size_t count = 0;

std::string toJsonString() const
{
Poco::JSON::Object json;
json.set("message", message);
json.set("part", part);
json.set("replica", replica);
json.set("time", time);
json.set("count", count);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}

static LastExceptionEntry fromJsonString(const std::string & json_string)
{
LastExceptionEntry entry;
if (json_string.empty())
return entry;

Poco::JSON::Parser parser;
auto json = parser.parse(json_string).extract<Poco::JSON::Object::Ptr>();
chassert(json);

if (json->has("message"))
entry.message = json->getValue<String>("message");
if (json->has("part"))
entry.part = json->getValue<String>("part");
if (json->has("replica"))
entry.replica = json->getValue<String>("replica");
if (json->has("time"))
entry.time = json->getValue<time_t>("time");
if (json->has("count"))
entry.count = json->getValue<size_t>("count");
return entry;
}
};

struct ExportReplicatedMergeTreePartitionProcessedPartEntry
{
String part_name;
Expand Down
8 changes: 8 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionTaskEntry.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <map>
#include <Storages/ExportReplicatedMergeTreePartitionManifest.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include "Core/QualifiedTableName.h"
Expand Down Expand Up @@ -32,6 +33,13 @@ struct ExportReplicatedMergeTreePartitionTaskEntry
/// There is also a chance this replica does not contain a given part and it is totally ok.
mutable std::vector<DataPartPtr> part_references;

/// In-memory mirror of <export-entry>/last_exception/<replica> leaves in ZK,
/// keyed by replica name (verbatim, not escaped). Refreshed on every poll() cycle
/// and on every status-change handler invocation; served verbatim to
/// system.replicated_partition_exports without any extra ZK read.
/// An empty map means no replica has recorded an exception yet for this task.
mutable std::map<String, LastExceptionEntry> last_exception_per_replica;

std::string getCompositeKey() const
{
const auto qualified_table_name = QualifiedTableName {manifest.destination_database, manifest.destination_table};
Expand Down
Loading
Loading