From 8163fb0b93e28206c5aa4abe3b405ce8da15b7c5 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 10 Mar 2026 20:38:12 -0300 Subject: [PATCH 1/5] port --- .../mergetree-family/part_export.md | 6 + .../mergetree-family/partition_export.md | 6 + src/Core/Settings.cpp | 3 + src/Core/SettingsChangesHistory.cpp | 1 + ...portReplicatedMergeTreePartitionManifest.h | 3 + src/Storages/MergeTree/ExportPartTask.cpp | 35 +++- .../ExportPartitionTaskScheduler.cpp | 2 + src/Storages/StorageReplicatedMergeTree.cpp | 2 + .../configs/macros_shard1_replica1.xml | 6 + .../configs/macros_shard2_replica1.xml | 6 + .../test.py | 195 ++++++++++++++++++ ...merge_tree_part_filename_pattern.reference | 16 ++ ...export_merge_tree_part_filename_pattern.sh | 49 +++++ 13 files changed, 329 insertions(+), 1 deletion(-) create mode 100644 tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard1_replica1.xml create mode 100644 tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard2_replica1.xml create mode 100644 tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.reference create mode 100755 tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.sh diff --git a/docs/en/engines/table-engines/mergetree-family/part_export.md b/docs/en/engines/table-engines/mergetree-family/part_export.md index 287e0a17f3af..90d636a19941 100644 --- a/docs/en/engines/table-engines/mergetree-family/part_export.md +++ b/docs/en/engines/table-engines/mergetree-family/part_export.md @@ -84,6 +84,12 @@ In case a table function is used as the destination, the schema can be omitted a - **Default**: `true` - **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables. +### export_merge_tree_part_filename_pattern + +- **Type**: `String` +- **Default**: `{part_name}_{checksum}` +- **Description**: Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported. + ## Examples ### Basic Export to S3 diff --git a/docs/en/engines/table-engines/mergetree-family/partition_export.md b/docs/en/engines/table-engines/mergetree-family/partition_export.md index d91f226dbbf6..af503ec5180a 100644 --- a/docs/en/engines/table-engines/mergetree-family/partition_export.md +++ b/docs/en/engines/table-engines/mergetree-family/partition_export.md @@ -82,6 +82,12 @@ TO TABLE [destination_database.]destination_table - **Default**: `true` - **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables. +### export_merge_tree_part_filename_pattern + +- **Type**: `String` +- **Default**: `{part_name}_{checksum}` +- **Description**: Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported. + ## Examples ### Basic Export to S3 diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index c39b79a29e5d..3321d046da56 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6934,6 +6934,9 @@ Throw an error if there are pending patch parts when exporting a merge tree part )", 0) \ DECLARE(Bool, serialize_string_in_memory_with_zero_byte, true, R"( Serialize String values during aggregation with zero byte at the end. Enable to keep compatibility when querying cluster of incompatible versions. +)", 0) \ + DECLARE(String, export_merge_tree_part_filename_pattern, "{part_name}_{checksum}", R"( +Pattern for the filename of the exported merge tree part. The `part_name` and `checksum` are calculated and replaced on the fly. Additional macros are supported. )", 0) \ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 196efcc1b902..670e9ed71d8a 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -67,6 +67,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"cluster_table_function_buckets_batch_size", 0, 0, "New setting."}, {"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."}, {"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."}, + {"export_merge_tree_part_filename_pattern", "", "{part_name}_{checksum}", "New setting"}, }); addSettingsChanges(settings_changes_history, "25.8", { diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index 7c18b8a881c2..583e57107a6a 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -116,6 +116,7 @@ struct ExportReplicatedMergeTreePartitionManifest size_t max_bytes_per_file; size_t max_rows_per_file; MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy; + String filename_pattern; std::string toJsonString() const { @@ -141,6 +142,7 @@ struct ExportReplicatedMergeTreePartitionManifest json.set("create_time", create_time); json.set("max_retries", max_retries); json.set("ttl_seconds", ttl_seconds); + json.set("filename_pattern", filename_pattern); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); Poco::JSON::Stringifier::stringify(json, oss); @@ -173,6 +175,7 @@ struct ExportReplicatedMergeTreePartitionManifest manifest.parquet_parallel_encoding = json->getValue("parquet_parallel_encoding"); manifest.max_bytes_per_file = json->getValue("max_bytes_per_file"); manifest.max_rows_per_file = json->getValue("max_rows_per_file"); + manifest.filename_pattern = json->getValue("filename_pattern"); if (json->has("file_already_exists_policy")) { const auto file_already_exists_policy = magic_enum::enum_cast(json->getValue("file_already_exists_policy")); diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index f506ebad4bbe..64b56969bf97 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -13,11 +13,14 @@ #include #include #include +#include #include #include +#include #include #include #include +#include namespace ProfileEvents { @@ -43,6 +46,7 @@ namespace Setting extern const SettingsUInt64 min_bytes_to_use_direct_io; extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file; extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file; + extern const SettingsString export_merge_tree_part_filename_pattern; } namespace @@ -80,6 +84,33 @@ namespace plan_for_part.addStep(std::move(expression_step)); } } + + String buildDestinationFilename( + const MergeTreePartExportManifest & manifest, + const StorageID & storage_id, + const ContextPtr & local_context) + { + auto filename = manifest.settings[Setting::export_merge_tree_part_filename_pattern].value; + + boost::replace_all(filename, "{part_name}", manifest.data_part->name); + boost::replace_all(filename, "{checksum}", manifest.data_part->checksums.getTotalChecksumHex()); + + Macros::MacroExpansionInfo macro_info; + macro_info.table_id = storage_id; + + if (auto database = DatabaseCatalog::instance().tryGetDatabase(storage_id.database_name)) + { + if (const auto replicated = dynamic_cast(database.get())) + { + macro_info.shard = replicated->getShardName(); + macro_info.replica = replicated->getReplicaName(); + } + } + + filename = local_context->getMacros()->expand(filename, macro_info); + + return filename; + } } ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_) @@ -136,8 +167,10 @@ bool ExportPartTask::executeStep() try { + const auto filename = buildDestinationFilename(manifest, destination_storage_id, local_context); + sink = destination_storage->import( - manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), + filename, block_with_partition_values, new_file_path_callback, manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite, diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index 925d7eafe412..a38ee157b6d7 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -35,6 +35,8 @@ namespace context_copy->setSetting("export_merge_tree_part_throw_on_pending_mutations", false); context_copy->setSetting("export_merge_tree_part_throw_on_pending_patch_parts", false); + context_copy->setSetting("export_merge_tree_part_filename_pattern", manifest.filename_pattern); + return context_copy; } } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 4b04fe1915f1..986b2bc34147 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -206,6 +206,7 @@ namespace Setting extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file; extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations; extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts; + extern const SettingsString export_merge_tree_part_filename_pattern; } namespace MergeTreeSetting @@ -8277,6 +8278,7 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & manifest.max_rows_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_rows_per_file]; manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value; + manifest.filename_pattern = query_context->getSettingsRef()[Setting::export_merge_tree_part_filename_pattern].value; ops.emplace_back(zkutil::makeCreateRequest( fs::path(partition_exports_path) / "metadata.json", diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard1_replica1.xml b/tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard1_replica1.xml new file mode 100644 index 000000000000..bae1ce119255 --- /dev/null +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard1_replica1.xml @@ -0,0 +1,6 @@ + + + shard1 + replica1 + + diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard2_replica1.xml b/tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard2_replica1.xml new file mode 100644 index 000000000000..fb9a587e736d --- /dev/null +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/configs/macros_shard2_replica1.xml @@ -0,0 +1,6 @@ + + + shard2 + replica1 + + diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index e84921b141a2..2a0a8cc5f073 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -101,6 +101,26 @@ def cluster(): stay_alive=True, with_zookeeper=True, ) + + # Sharded instances for filename pattern tests + cluster.add_instance( + "shard1_replica1", + main_configs=["configs/named_collections.xml", "configs/allow_experimental_export_partition.xml", "configs/macros_shard1_replica1.xml"], + user_configs=["configs/users.d/profile.xml"], + with_minio=True, + stay_alive=True, + with_zookeeper=True, + ) + + cluster.add_instance( + "shard2_replica1", + main_configs=["configs/named_collections.xml", "configs/allow_experimental_export_partition.xml", "configs/macros_shard2_replica1.xml"], + user_configs=["configs/users.d/profile.xml"], + with_minio=True, + stay_alive=True, + with_zookeeper=True, + ) + logging.info("Starting cluster...") cluster.start() yield cluster @@ -1008,3 +1028,178 @@ def test_export_partition_with_mixed_computed_columns(cluster): AND partition_id = '1' """) assert status.strip() == "COMPLETED", f"Expected COMPLETED status, got: {status}" + + +def skip_if_remote_database_disk_enabled(cluster): + """Skip test if any instance in the cluster has remote database disk enabled. + + In this branch, this is useful for test_export_partition_from_replicated_database_uses_db_shard_replica_macros + """ + for instance in cluster.instances.values(): + if instance.with_remote_database_disk: + pytest.skip("Test cannot run with remote database disk enabled (db disk)") + + +def test_sharded_export_partition_with_filename_pattern(cluster): + """Test that export partition with filename pattern prevents collisions in sharded setup.""" + shard1_r1 = cluster.instances["shard1_replica1"] + shard2_r1 = cluster.instances["shard2_replica1"] + watcher_node = cluster.instances["watcher_node"] + + mt_table = "sharded_mt_table" + s3_table = "sharded_s3_table" + + # Create sharded tables on all shards with same partition data (same part names) + # Each shard uses different ZooKeeper path via {shard} macro + create_sharded_tables_and_insert_data(shard1_r1, mt_table, s3_table, "replica1") + create_sharded_tables_and_insert_data(shard2_r1, mt_table, s3_table, "replica1") + create_s3_table(watcher_node, s3_table) + + # Export partition from both shards with filename pattern including shard + # This should prevent filename collisions + shard1_r1.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_part_filename_pattern = '{{part_name}}_{{shard}}_{{replica}}_{{checksum}}'" + ) + shard2_r1.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_part_filename_pattern = '{{part_name}}_{{shard}}_{{replica}}_{{checksum}}'" + ) + + # Wait for exports to complete + wait_for_export_status(shard1_r1, mt_table, s3_table, "2020", "COMPLETED") + wait_for_export_status(shard2_r1, mt_table, s3_table, "2020", "COMPLETED") + + total_count = watcher_node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip() + assert total_count == "6", f"Expected 6 total rows (3 from each shard), got {total_count}" + + # Verify filenames contain shard information (check via S3 directly) + # Get all files from S3 - query from watcher_node since S3 is shared + files_shard1 = watcher_node.query( + f"SELECT _file FROM s3(s3_conn, filename='{s3_table}/**', format='One') WHERE _file LIKE '%shard1%' LIMIT 1" + ).strip() + files_shard2 = watcher_node.query( + f"SELECT _file FROM s3(s3_conn, filename='{s3_table}/**', format='One') WHERE _file LIKE '%shard2%' LIMIT 1" + ).strip() + + # Both shards should have files with their shard names + assert "shard1" in files_shard1 or files_shard1 == "", f"Expected shard1 in filenames, got: {files_shard1}" + assert "shard2" in files_shard2 or files_shard2 == "", f"Expected shard2 in filenames, got: {files_shard2}" + + +def test_export_partition_from_replicated_database_uses_db_shard_replica_macros(cluster): + """Test that {shard} and {replica} in the filename pattern are expanded from the + DatabaseReplicated identity, NOT from server config macros. + + replica1 has no / entries in its server config section. + Without the fix buildDestinationFilename() leaves macro_info.shard/replica unset, so + Macros::expand() falls through to the config-macros lookup and throws NO_ELEMENTS_IN_CONFIG. + With the fix the DatabaseReplicated shard_name / replica_name are injected into macro_info + before the expand call, and the pattern resolves correctly. + """ + + # The remote disk test suite sets the shard and replica macros in https://github.com/Altinity/ClickHouse/blob/bbabcaa96e8b7fe8f70ecd0bd4f76fb0f76f2166/tests/integration/helpers/cluster.py#L4356 + # When expanding the macros, the configured ones are preferred over the ones from the DatabaseReplicated definition. + # Therefore, this test fails. It is easier to skip it than to fix it. + skip_if_remote_database_disk_enabled(cluster) + + node = cluster.instances["replica1"] + watcher_node = cluster.instances["watcher_node"] + + postfix = str(uuid.uuid4()).replace("-", "_") + db_name = f"repdb_{postfix}" + table_name = "mt_table" + s3_table = f"s3_dbreplicated_{postfix}" + + # These values exist only in the DatabaseReplicated definition – they are NOT + # present anywhere in replica1's server config . + db_shard = "db_shard_x" + db_replica = "db_replica_y" + + node.query( + f"CREATE DATABASE {db_name} " + f"ENGINE = Replicated('/clickhouse/databases/{db_name}', '{db_shard}', '{db_replica}')") + + node.query(f""" + CREATE TABLE {db_name}.{table_name} + (id UInt64, year UInt16) + ENGINE = ReplicatedMergeTree() + PARTITION BY year ORDER BY tuple()""") + + node.query(f"INSERT INTO {db_name}.{table_name} VALUES (1, 2020), (2, 2020), (3, 2020)") + # Stop merges so part names stay stable during the test. + node.query(f"SYSTEM STOP MERGES {db_name}.{table_name}") + + node.query( + f"CREATE TABLE {s3_table} (id UInt64, year UInt16) " + f"ENGINE = S3(s3_conn, filename='{s3_table}', format=Parquet, partition_strategy='hive') " + f"PARTITION BY year") + + watcher_node.query( + f"CREATE TABLE {s3_table} (id UInt64, year UInt16) " + f"ENGINE = S3(s3_conn, filename='{s3_table}', format=Parquet, partition_strategy='hive') " + f"PARTITION BY year") + + # Export with {shard} and {replica} in the pattern. + # Before the fix: Macros::expand throws NO_ELEMENTS_IN_CONFIG because replica1 has + # no / server config macros. + # After the fix: DatabaseReplicated's shard_name/replica_name are wired into + # macro_info before the expand call, so this succeeds and produces the right names. + node.query( + f"ALTER TABLE {db_name}.{table_name} EXPORT PARTITION ID '2020' TO TABLE {s3_table} " + f"SETTINGS export_merge_tree_part_filename_pattern = " + f"'{{part_name}}_{{shard}}_{{replica}}_{{checksum}}'") + + # A FAILED status here almost certainly means the macro expansion threw + # NO_ELEMENTS_IN_CONFIG (i.e. the fix is missing or broken). + wait_for_export_status(node, table_name, s3_table, "2020", "COMPLETED") + + # Data should have landed in S3. + count = watcher_node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip() + assert count == "3", f"Expected 3 exported rows, got {count}" + + # The exported filename must contain the exact shard and replica names from the + # DatabaseReplicated definition, proving the fix injected them (not server config macros). + filename = watcher_node.query( + f"SELECT _file FROM s3(s3_conn, filename='{s3_table}/**/*.parquet', format='One') LIMIT 1" + ).strip() + + assert db_shard in filename, ( + f"Expected filename to contain DatabaseReplicated shard '{db_shard}', got: {filename!r}. " + "Suggests {shard} was not expanded from the DatabaseReplicated identity.") + + assert db_replica in filename, ( + f"Expected filename to contain DatabaseReplicated replica '{db_replica}', got: {filename!r}. " + "Suggests {replica} was not expanded from the DatabaseReplicated identity.") + + +def test_sharded_export_partition_default_pattern(cluster): + shard1_r1 = cluster.instances["shard1_replica1"] + shard2_r1 = cluster.instances["shard2_replica1"] + watcher_node = cluster.instances["watcher_node"] + + mt_table = "sharded_mt_table_default" + s3_table = "sharded_s3_table_default" + + # Create sharded tables with different ZooKeeper paths per shard + create_sharded_tables_and_insert_data(shard1_r1, mt_table, s3_table, "replica1") + create_sharded_tables_and_insert_data(shard2_r1, mt_table, s3_table, "replica1") + create_s3_table(watcher_node, s3_table) + + # Export with default pattern ({part_name}_{checksum}) - may cause collisions if parts have same name and the same checksum + shard1_r1.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}" + ) + shard2_r1.query( + f"ALTER TABLE {mt_table} EXPORT PARTITION ID '2020' TO TABLE {s3_table}" + ) + + wait_for_export_status(shard1_r1, mt_table, s3_table, "2020", "COMPLETED") + wait_for_export_status(shard2_r1, mt_table, s3_table, "2020", "COMPLETED") + + # Both exports should complete (even if there are collisions, the overwrite policy handles it) + # S3 tables are shared, so query from watcher_node + total_count = watcher_node.query(f"SELECT count() FROM {s3_table} WHERE year = 2020").strip() + + # only one file with 3 rows should be present + assert int(total_count) == 3, f"Expected 3 rows, got {total_count}" diff --git a/tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.reference b/tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.reference new file mode 100644 index 000000000000..8016f5aa113e --- /dev/null +++ b/tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.reference @@ -0,0 +1,16 @@ +---- Test: Default pattern {part_name}_{checksum} +1 2020 +2 2020 +3 2020 +---- Verify filename matches 2020_1_1_0_*.1.parquet +1 +---- Test: Custom prefix pattern +4 2021 +---- Verify filename matches myprefix_2021_2_2_0.1.parquet +1 +---- Test: Pattern with macros +1 2020 +2 2020 +3 2020 +---- Verify macros expanded (no literal braces in parquet filenames, that's the best we can do for stateless tests) +1 diff --git a/tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.sh b/tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.sh new file mode 100755 index 000000000000..12b47f4f2664 --- /dev/null +++ b/tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.sh @@ -0,0 +1,49 @@ +#!/usr/bin/env bash +# Tags: no-fasttest +# Tag no-fasttest: requires s3 storage + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +R=$RANDOM +mt="mt_${R}" +dest1="fp_dest1_${R}" +dest2="fp_dest2_${R}" +dest3="fp_dest3_${R}" + +query() { + $CLICKHOUSE_CLIENT --query "$1" +} + +query "DROP TABLE IF EXISTS $mt, $dest1, $dest2, $dest3" + +query "CREATE TABLE $mt (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()" +query "INSERT INTO $mt VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)" + +query "CREATE TABLE $dest1 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest1', format=Parquet, partition_strategy='hive') PARTITION BY year" +query "CREATE TABLE $dest2 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest2', format=Parquet, partition_strategy='hive') PARTITION BY year" +query "CREATE TABLE $dest3 (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$dest3', format=Parquet, partition_strategy='hive') PARTITION BY year" + +echo "---- Test: Default pattern {part_name}_{checksum}" +query "ALTER TABLE $mt EXPORT PART '2020_1_1_0' TO TABLE $dest1 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = '{part_name}_{checksum}'" +sleep 3 +query "SELECT * FROM $dest1 ORDER BY id" +echo "---- Verify filename matches 2020_1_1_0_*.1.parquet" +query "SELECT count() FROM s3(s3_conn, filename='$dest1/**/2020_1_1_0_*.1.parquet', format='One')" + +echo "---- Test: Custom prefix pattern" +query "ALTER TABLE $mt EXPORT PART '2021_2_2_0' TO TABLE $dest2 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = 'myprefix_{part_name}'" +sleep 3 +query "SELECT * FROM $dest2 ORDER BY id" +echo "---- Verify filename matches myprefix_2021_2_2_0.1.parquet" +query "SELECT count() FROM s3(s3_conn, filename='$dest2/**/myprefix_2021_2_2_0.1.parquet', format='One')" + +echo "---- Test: Pattern with macros" +query "ALTER TABLE $mt EXPORT PART '2020_1_1_0' TO TABLE $dest3 SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_filename_pattern = '{database}_{table}_{part_name}'" +sleep 3 +query "SELECT * FROM $dest3 ORDER BY id" +echo "---- Verify macros expanded (no literal braces in parquet filenames, that's the best we can do for stateless tests)" +query "SELECT count() = 0 FROM s3(s3_conn, filename='$dest3/**/*.1.parquet', format='One') WHERE _file LIKE '%{%'" + +query "DROP TABLE IF EXISTS $mt, $dest1, $dest2, $dest3" From a3d35bb3adf2ce2f169e40c276ac7dfc1e3fbf51 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 10 Mar 2026 20:42:22 -0300 Subject: [PATCH 2/5] no need to skip tests in 258 --- .../test.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index 2a0a8cc5f073..4e0d9feb31ed 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -1030,16 +1030,6 @@ def test_export_partition_with_mixed_computed_columns(cluster): assert status.strip() == "COMPLETED", f"Expected COMPLETED status, got: {status}" -def skip_if_remote_database_disk_enabled(cluster): - """Skip test if any instance in the cluster has remote database disk enabled. - - In this branch, this is useful for test_export_partition_from_replicated_database_uses_db_shard_replica_macros - """ - for instance in cluster.instances.values(): - if instance.with_remote_database_disk: - pytest.skip("Test cannot run with remote database disk enabled (db disk)") - - def test_sharded_export_partition_with_filename_pattern(cluster): """Test that export partition with filename pattern prevents collisions in sharded setup.""" shard1_r1 = cluster.instances["shard1_replica1"] @@ -1098,11 +1088,6 @@ def test_export_partition_from_replicated_database_uses_db_shard_replica_macros( before the expand call, and the pattern resolves correctly. """ - # The remote disk test suite sets the shard and replica macros in https://github.com/Altinity/ClickHouse/blob/bbabcaa96e8b7fe8f70ecd0bd4f76fb0f76f2166/tests/integration/helpers/cluster.py#L4356 - # When expanding the macros, the configured ones are preferred over the ones from the DatabaseReplicated definition. - # Therefore, this test fails. It is easier to skip it than to fix it. - skip_if_remote_database_disk_enabled(cluster) - node = cluster.instances["replica1"] watcher_node = cluster.instances["watcher_node"] From c384d5bf9ed598da2572b08673644c35fa809fd6 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 10 Mar 2026 20:47:11 -0300 Subject: [PATCH 3/5] fix port glitch --- src/Storages/MergeTree/ExportPartTask.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index 64b56969bf97..22ef7953f690 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -167,7 +167,7 @@ bool ExportPartTask::executeStep() try { - const auto filename = buildDestinationFilename(manifest, destination_storage_id, local_context); + const auto filename = buildDestinationFilename(manifest, storage.getStorageID(), local_context); sink = destination_storage->import( filename, From 9bae3672341c598d0a4e8b63af414599c0da7014 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 10 Mar 2026 20:53:14 -0300 Subject: [PATCH 4/5] fix tst port glitch --- .../test.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index 4e0d9feb31ed..f18fa38bee8c 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -139,6 +139,14 @@ def create_tables_and_insert_data(node, mt_table, s3_table, replica_name): create_s3_table(node, s3_table) +def create_sharded_tables_and_insert_data(node, mt_table, s3_table, replica_name): + """Create sharded ReplicatedMergeTree table with {shard} macro in ZooKeeper path.""" + node.query(f"CREATE TABLE {mt_table} (id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{{shard}}/{mt_table}', '{replica_name}') PARTITION BY year ORDER BY tuple()") + node.query(f"INSERT INTO {mt_table} VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)") + + create_s3_table(node, s3_table) + + def test_restart_nodes_during_export(cluster): node = cluster.instances["replica1"] node2 = cluster.instances["replica2"] From 71a4b9a1a5a7b923ef3b83759e77c159bc90abc5 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 12 Mar 2026 09:10:57 -0300 Subject: [PATCH 5/5] flaky test improvement, perhaps we should disable it --- .../test.py | 107 +++++++++++------- 1 file changed, 64 insertions(+), 43 deletions(-) diff --git a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py index f18fa38bee8c..af899b91ff70 100644 --- a/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py +++ b/tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py @@ -152,8 +152,9 @@ def test_restart_nodes_during_export(cluster): node2 = cluster.instances["replica2"] watcher_node = cluster.instances["watcher_node"] - mt_table = "disaster_mt_table" - s3_table = "disaster_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"disaster_mt_table_{postfix}" + s3_table = f"disaster_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") create_tables_and_insert_data(node2, mt_table, s3_table, "replica2") @@ -227,8 +228,9 @@ def test_kill_export(cluster): node2 = cluster.instances["replica2"] watcher_node = cluster.instances["watcher_node"] - mt_table = "kill_export_mt_table" - s3_table = "kill_export_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"kill_export_mt_table_{postfix}" + s3_table = f"kill_export_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") create_tables_and_insert_data(node2, mt_table, s3_table, "replica2") @@ -287,8 +289,9 @@ def test_drop_source_table_during_export(cluster): # node2 = cluster.instances["replica2"] watcher_node = cluster.instances["watcher_node"] - mt_table = "drop_source_table_during_export_mt_table" - s3_table = "drop_source_table_during_export_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"drop_source_table_during_export_mt_table_{postfix}" + s3_table = f"drop_source_table_during_export_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") # create_tables_and_insert_data(node2, mt_table, s3_table, "replica2") @@ -337,9 +340,10 @@ def test_drop_source_table_during_export(cluster): def test_concurrent_exports_to_different_targets(cluster): node = cluster.instances["replica1"] - mt_table = "concurrent_diff_targets_mt_table" - s3_table_a = "concurrent_diff_targets_s3_a" - s3_table_b = "concurrent_diff_targets_s3_b" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"concurrent_diff_targets_mt_table_{postfix}" + s3_table_a = f"concurrent_diff_targets_s3_a_{postfix}" + s3_table_b = f"concurrent_diff_targets_s3_b_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table_a, "replica1") create_s3_table(node, s3_table_b) @@ -374,8 +378,9 @@ def test_concurrent_exports_to_different_targets(cluster): def test_failure_is_logged_in_system_table(cluster): node = cluster.instances["replica1"] - mt_table = "failure_is_logged_in_system_table_mt_table" - s3_table = "failure_is_logged_in_system_table_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"failure_is_logged_in_system_table_mt_table_{postfix}" + s3_table = f"failure_is_logged_in_system_table_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") @@ -439,8 +444,9 @@ def test_failure_is_logged_in_system_table(cluster): def test_inject_short_living_failures(cluster): node = cluster.instances["replica1"] - mt_table = "inject_short_living_failures_mt_table" - s3_table = "inject_short_living_failures_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"inject_short_living_failures_mt_table_{postfix}" + s3_table = f"inject_short_living_failures_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") @@ -504,8 +510,9 @@ def test_inject_short_living_failures(cluster): def test_export_ttl(cluster): node = cluster.instances["replica1"] - mt_table = "export_ttl_mt_table" - s3_table = "export_ttl_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"export_ttl_mt_table_{postfix}" + s3_table = f"export_ttl_s3_table_{postfix}" expiration_time = 3 @@ -539,8 +546,9 @@ def test_export_ttl(cluster): def test_export_partition_file_already_exists_policy(cluster): node = cluster.instances["replica1"] - mt_table = "export_partition_file_already_exists_policy_mt_table" - s3_table = "export_partition_file_already_exists_policy_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"export_partition_file_already_exists_policy_mt_table_{postfix}" + s3_table = f"export_partition_file_already_exists_policy_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") @@ -626,8 +634,9 @@ def test_export_partition_file_already_exists_policy(cluster): def test_export_partition_feature_is_disabled(cluster): replica_with_export_disabled = cluster.instances["replica_with_export_disabled"] - mt_table = "export_partition_feature_is_disabled_mt_table" - s3_table = "export_partition_feature_is_disabled_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"export_partition_feature_is_disabled_mt_table_{postfix}" + s3_table = f"export_partition_feature_is_disabled_s3_table_{postfix}" create_tables_and_insert_data(replica_with_export_disabled, mt_table, s3_table, "replica1") @@ -646,8 +655,9 @@ def test_export_partition_permissions(cluster): """ node = cluster.instances["replica1"] - mt_table = "permissions_mt_table" - s3_table = "permissions_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"permissions_mt_table_{postfix}" + s3_table = f"permissions_s3_table_{postfix}" # Create tables as default user create_tables_and_insert_data(node, mt_table, s3_table, "replica1") @@ -715,8 +725,9 @@ def test_export_partition_permissions(cluster): def test_multiple_exports_within_a_single_query(cluster): node = cluster.instances["replica1"] - mt_table = "multiple_exports_within_a_single_query_mt_table" - s3_table = "multiple_exports_within_a_single_query_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"multiple_exports_within_a_single_query_mt_table_{postfix}" + s3_table = f"multiple_exports_within_a_single_query_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") @@ -753,8 +764,9 @@ def test_pending_mutations_throw_before_export_partition(cluster): """Test that pending mutations before export partition throw an error.""" node = cluster.instances["replica1"] - mt_table = "pending_mutations_throw_partition_mt_table" - s3_table = "pending_mutations_throw_partition_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"pending_mutations_throw_partition_mt_table_{postfix}" + s3_table = f"pending_mutations_throw_partition_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") @@ -777,8 +789,9 @@ def test_pending_mutations_skip_before_export_partition(cluster): """Test that pending mutations before export partition are skipped with throw_on_pending_mutations=false.""" node = cluster.instances["replica1"] - mt_table = "pending_mutations_skip_partition_mt_table" - s3_table = "pending_mutations_skip_partition_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"pending_mutations_skip_partition_mt_table_{postfix}" + s3_table = f"pending_mutations_skip_partition_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") @@ -806,8 +819,9 @@ def test_pending_patch_parts_throw_before_export_partition(cluster): """Test that pending patch parts before export partition throw an error with default settings.""" node = cluster.instances["replica1"] - mt_table = "pending_patches_throw_partition_mt_table" - s3_table = "pending_patches_throw_partition_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"pending_patches_throw_partition_mt_table_{postfix}" + s3_table = f"pending_patches_throw_partition_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") @@ -829,8 +843,9 @@ def test_pending_patch_parts_skip_before_export_partition(cluster): """Test that pending patch parts before export partition are skipped with throw_on_pending_patch_parts=false.""" node = cluster.instances["replica1"] - mt_table = "pending_patches_skip_partition_mt_table" - s3_table = "pending_patches_skip_partition_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"pending_patches_skip_partition_mt_table_{postfix}" + s3_table = f"pending_patches_skip_partition_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") @@ -855,8 +870,9 @@ def test_mutations_after_export_partition_started(cluster): """Test that mutations applied after export partition starts don't affect the exported data.""" node = cluster.instances["replica1"] - mt_table = "mutations_after_export_partition_mt_table" - s3_table = "mutations_after_export_partition_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"mutations_after_export_partition_mt_table_{postfix}" + s3_table = f"mutations_after_export_partition_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") @@ -900,8 +916,9 @@ def test_patch_parts_after_export_partition_started(cluster): """Test that patch parts created after export partition starts don't affect the exported data.""" node = cluster.instances["replica1"] - mt_table = "patches_after_export_partition_mt_table" - s3_table = "patches_after_export_partition_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"patches_after_export_partition_mt_table_{postfix}" + s3_table = f"patches_after_export_partition_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") @@ -947,8 +964,9 @@ def test_mutation_in_partition_clause(cluster): allow exports of unaffected partitions to succeed.""" node = cluster.instances["replica1"] - mt_table = "mutation_in_partition_clause_mt_table" - s3_table = "mutation_in_partition_clause_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"mutation_in_partition_clause_mt_table_{postfix}" + s3_table = f"mutation_in_partition_clause_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table, "replica1") @@ -986,8 +1004,9 @@ def test_export_partition_with_mixed_computed_columns(cluster): """Test export partition with ALIAS, MATERIALIZED, and EPHEMERAL columns.""" node = cluster.instances["replica1"] - mt_table = "mixed_computed_mt_table" - s3_table = "mixed_computed_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"mixed_computed_mt_table_{postfix}" + s3_table = f"mixed_computed_s3_table_{postfix}" node.query(f""" CREATE TABLE {mt_table} ( @@ -1044,8 +1063,9 @@ def test_sharded_export_partition_with_filename_pattern(cluster): shard2_r1 = cluster.instances["shard2_replica1"] watcher_node = cluster.instances["watcher_node"] - mt_table = "sharded_mt_table" - s3_table = "sharded_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"sharded_mt_table_{postfix}" + s3_table = f"sharded_s3_table_{postfix}" # Create sharded tables on all shards with same partition data (same part names) # Each shard uses different ZooKeeper path via {shard} macro @@ -1171,8 +1191,9 @@ def test_sharded_export_partition_default_pattern(cluster): shard2_r1 = cluster.instances["shard2_replica1"] watcher_node = cluster.instances["watcher_node"] - mt_table = "sharded_mt_table_default" - s3_table = "sharded_s3_table_default" + postfix = str(uuid.uuid4()).replace("-", "_") + mt_table = f"sharded_mt_table_default_{postfix}" + s3_table = f"sharded_s3_table_default_{postfix}" # Create sharded tables with different ZooKeeper paths per shard create_sharded_tables_and_insert_data(shard1_r1, mt_table, s3_table, "replica1")