Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/en/engines/table-engines/mergetree-family/part_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ In case a table function is used as the destination, the schema can be omitted a
- **Default**: `true`
- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.

### export_merge_tree_part_filename_pattern

- **Type**: `String`
- **Default**: `{part_name}_{checksum}`
- **Description**: Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.

## Examples

### Basic Export to S3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ TO TABLE [destination_database.]destination_table
- **Default**: `true`
- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.

### export_merge_tree_part_filename_pattern

- **Type**: `String`
- **Default**: `{part_name}_{checksum}`
- **Description**: Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.

## Examples

### Basic Export to S3
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6934,6 +6934,9 @@ Throw an error if there are pending patch parts when exporting a merge tree part
)", 0) \
DECLARE(Bool, serialize_string_in_memory_with_zero_byte, true, R"(
Serialize String values during aggregation with zero byte at the end. Enable to keep compatibility when querying cluster of incompatible versions.
)", 0) \
DECLARE(String, export_merge_tree_part_filename_pattern, "{part_name}_{checksum}", R"(
Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported.
)", 0) \
\
/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"cluster_table_function_buckets_batch_size", 0, 0, "New setting."},
{"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."},
{"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."},
{"export_merge_tree_part_filename_pattern", "", "{part_name}_{checksum}", "New setting"},
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ struct ExportReplicatedMergeTreePartitionManifest
size_t max_bytes_per_file;
size_t max_rows_per_file;
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;
String filename_pattern;

std::string toJsonString() const
{
Expand All @@ -141,6 +142,7 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("create_time", create_time);
json.set("max_retries", max_retries);
json.set("ttl_seconds", ttl_seconds);
json.set("filename_pattern", filename_pattern);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
Expand Down Expand Up @@ -173,6 +175,7 @@ struct ExportReplicatedMergeTreePartitionManifest
manifest.parquet_parallel_encoding = json->getValue<bool>("parquet_parallel_encoding");
manifest.max_bytes_per_file = json->getValue<size_t>("max_bytes_per_file");
manifest.max_rows_per_file = json->getValue<size_t>("max_rows_per_file");
manifest.filename_pattern = json->getValue<String>("filename_pattern");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve backward compatibility for manifest deserialization

fromJsonString now unconditionally reads filename_pattern, but older metadata.json export manifests do not contain this key. On mixed-version or restarted clusters with pre-existing export tasks, json->getValue<String>("filename_pattern") will throw during task loading and prevent those exports from being processed. This field needs an optional read with a default (matching {part_name}_{checksum}) like other backward-compatible manifest fields.

Useful? React with 👍 / 👎.

if (json->has("file_already_exists_policy"))
{
const auto file_already_exists_policy = magic_enum::enum_cast<MergeTreePartExportManifest::FileAlreadyExistsPolicy>(json->getValue<String>("file_already_exists_policy"));
Expand Down
35 changes: 34 additions & 1 deletion src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/Macros.h>
#include <Common/Exception.h>
#include <Common/ProfileEventsScope.h>
#include <Databases/DatabaseReplicated.h>
#include <Storages/MergeTree/ExportList.h>
#include <Formats/FormatFactory.h>
#include <Databases/enableAllExperimentalSettings.h>
#include <boost/algorithm/string/replace.hpp>

namespace ProfileEvents
{
Expand All @@ -43,6 +46,7 @@ namespace Setting
extern const SettingsUInt64 min_bytes_to_use_direct_io;
extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file;
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
extern const SettingsString export_merge_tree_part_filename_pattern;
}

namespace
Expand Down Expand Up @@ -80,6 +84,33 @@ namespace
plan_for_part.addStep(std::move(expression_step));
}
}

String buildDestinationFilename(
const MergeTreePartExportManifest & manifest,
const StorageID & storage_id,
const ContextPtr & local_context)
{
auto filename = manifest.settings[Setting::export_merge_tree_part_filename_pattern].value;

boost::replace_all(filename, "{part_name}", manifest.data_part->name);
boost::replace_all(filename, "{checksum}", manifest.data_part->checksums.getTotalChecksumHex());

Macros::MacroExpansionInfo macro_info;
macro_info.table_id = storage_id;

if (auto database = DatabaseCatalog::instance().tryGetDatabase(storage_id.database_name))
{
if (const auto replicated = dynamic_cast<const DatabaseReplicated *>(database.get()))
{
macro_info.shard = replicated->getShardName();
macro_info.replica = replicated->getReplicaName();
}
}

filename = local_context->getMacros()->expand(filename, macro_info);

return filename;
}
}

ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_)
Expand Down Expand Up @@ -136,8 +167,10 @@ bool ExportPartTask::executeStep()

try
{
const auto filename = buildDestinationFilename(manifest, storage.getStorageID(), local_context);

sink = destination_storage->import(
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),
filename,
block_with_partition_values,
new_file_path_callback,
manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite,
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ namespace
context_copy->setSetting("export_merge_tree_part_throw_on_pending_mutations", false);
context_copy->setSetting("export_merge_tree_part_throw_on_pending_patch_parts", false);

context_copy->setSetting("export_merge_tree_part_filename_pattern", manifest.filename_pattern);

return context_copy;
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ namespace Setting
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations;
extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts;
extern const SettingsString export_merge_tree_part_filename_pattern;
}

namespace MergeTreeSetting
Expand Down Expand Up @@ -8277,6 +8278,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
manifest.max_rows_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_rows_per_file];

manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value;
manifest.filename_pattern = query_context->getSettingsRef()[Setting::export_merge_tree_part_filename_pattern].value;

ops.emplace_back(zkutil::makeCreateRequest(
fs::path(partition_exports_path) / "metadata.json",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<clickhouse>
<macros>
<shard>shard1</shard>
<replica>replica1</replica>
</macros>
</clickhouse>
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<clickhouse>
<macros>
<shard>shard2</shard>
<replica>replica1</replica>
</macros>
</clickhouse>
Loading
Loading