diff --git a/src/Storages/IPartitionStrategy.cpp b/src/Storages/IPartitionStrategy.cpp index 8a4ac6f12df2..02f310a9e23b 100644 --- a/src/Storages/IPartitionStrategy.cpp +++ b/src/Storages/IPartitionStrategy.cpp @@ -22,6 +22,8 @@ extern const int BAD_ARGUMENTS; namespace { + using PartitionExpressionActionsAndColumnName = IPartitionStrategy::PartitionExpressionActionsAndColumnName; + /// Builds AST for hive partition path format /// `partition_column_1=toString(partition_value_expr_1)/ ... /partition_column_N=toString(partition_value_expr_N)/` /// for given partition columns list and a partition by AST. @@ -89,6 +91,33 @@ namespace return result; } + ASTPtr buildToStringPartitionAST(ASTPtr partition_by) + { + ASTs arguments(1, partition_by); + return makeASTFunction("toString", std::move(arguments)); + } + + template + PartitionExpressionActionsAndColumnName getCachedOrBuildActions( + const std::optional & cached_result, + const IPartitionStrategy & partition_strategy, + BuildAST && build_ast) + { + if (cached_result) + return *cached_result; + + auto expression_ast = build_ast(); + return partition_strategy.getPartitionExpressionActions(expression_ast); + } + + void cacheDeterministicActions( + std::optional & cached_result, + const PartitionExpressionActionsAndColumnName & actions_with_column) + { + if (!actions_with_column.actions->getActionsDAG().hasNonDeterministic()) + cached_result = actions_with_column; + } + std::shared_ptr createHivePartitionStrategy( ASTPtr partition_by, const Block & sample_block, @@ -191,11 +220,8 @@ const KeyDescription & IPartitionStrategy::getPartitionKeyDescription() const } IPartitionStrategy::PartitionExpressionActionsAndColumnName -IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast) +IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast) const { - if (cached_result) - return *cached_result; - auto syntax_result = TreeRewriter(context).analyze(expression_ast, sample_block.getNamesAndTypesList()); auto actions_dag = ExpressionAnalyzer(expression_ast, syntax_result, context).getActionsDAG(false); @@ -204,9 +230,6 @@ IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast) std::move(actions_dag), ExpressionActionsSettings(context), false); result.column_name = expression_ast->getColumnName(); - if (!result.actions->getActionsDAG().hasNonDeterministic()) - cached_result = result; - return result; } @@ -259,13 +282,19 @@ std::shared_ptr PartitionStrategyFactory::get(StrategyType s WildcardPartitionStrategy::WildcardPartitionStrategy(KeyDescription partition_key_description_, const Block & sample_block_, ContextPtr context_) : IPartitionStrategy(partition_key_description_, sample_block_, context_) { + auto actions_with_column = getCachedOrBuildActions( + cached_result, + *this, + [&] { return buildToStringPartitionAST(partition_key_description.definition_ast); }); + cacheDeterministicActions(cached_result, actions_with_column); } -ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk) +ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk) const { - ASTs arguments(1, partition_key_description.definition_ast); - ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); - auto actions_with_column = getPartitionExpressionActions(partition_by_string); + auto actions_with_column = getCachedOrBuildActions( + cached_result, + *this, + [&] { return buildToStringPartitionAST(partition_key_description.definition_ast); }); Block block_with_partition_by_expr = sample_block.cloneWithoutColumns(); block_with_partition_by_expr.setColumns(chunk.getColumns()); @@ -274,11 +303,13 @@ ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk) return block_with_partition_by_expr.getByName(actions_with_column.column_name).column; } -ColumnPtr WildcardPartitionStrategy::computePartitionKey(Block & block) +ColumnPtr WildcardPartitionStrategy::computePartitionKey(Block & block) const { - ASTs arguments(1, partition_key_description.definition_ast); - ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); - auto actions_with_column = getPartitionExpressionActions(partition_by_string); + auto actions_with_column = getCachedOrBuildActions( + cached_result, + *this, + [&] { return buildToStringPartitionAST(partition_key_description.definition_ast); }); + actions_with_column.actions->execute(block); return block.getByName(actions_with_column.column_name).column; } @@ -300,12 +331,20 @@ HiveStylePartitionStrategy::HiveStylePartitionStrategy( } block_without_partition_columns = buildBlockWithoutPartitionColumns(sample_block, partition_columns_name_set); + + auto actions_with_column = getCachedOrBuildActions( + cached_result, + *this, + [&] { return buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); }); + cacheDeterministicActions(cached_result, actions_with_column); } -ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) +ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) const { - auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); - auto actions_with_column = getPartitionExpressionActions(hive_ast); + auto actions_with_column = getCachedOrBuildActions( + cached_result, + *this, + [&] { return buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); }); Block block_with_partition_by_expr = sample_block.cloneWithoutColumns(); block_with_partition_by_expr.setColumns(chunk.getColumns()); @@ -314,10 +353,13 @@ ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) return block_with_partition_by_expr.getByName(actions_with_column.column_name).column; } -ColumnPtr HiveStylePartitionStrategy::computePartitionKey(Block & block) +ColumnPtr HiveStylePartitionStrategy::computePartitionKey(Block & block) const { - auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); - auto actions_with_column = getPartitionExpressionActions(hive_ast); + auto actions_with_column = getCachedOrBuildActions( + cached_result, + *this, + [&] { return buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); }); + actions_with_column.actions->execute(block); return block.getByName(actions_with_column.column_name).column; } diff --git a/src/Storages/IPartitionStrategy.h b/src/Storages/IPartitionStrategy.h index 91397de2362d..1378762c6911 100644 --- a/src/Storages/IPartitionStrategy.h +++ b/src/Storages/IPartitionStrategy.h @@ -27,9 +27,9 @@ struct IPartitionStrategy virtual ~IPartitionStrategy() = default; - virtual ColumnPtr computePartitionKey(const Chunk & chunk) = 0; + virtual ColumnPtr computePartitionKey(const Chunk & chunk) const = 0; - virtual ColumnPtr computePartitionKey(Block & block) = 0; + virtual ColumnPtr computePartitionKey(Block & block) const = 0; virtual ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) { @@ -48,7 +48,7 @@ struct IPartitionStrategy NamesAndTypesList getPartitionColumns() const; const KeyDescription & getPartitionKeyDescription() const; - PartitionExpressionActionsAndColumnName getPartitionExpressionActions(ASTPtr & expression_ast); + PartitionExpressionActionsAndColumnName getPartitionExpressionActions(ASTPtr & expression_ast) const; protected: const KeyDescription partition_key_description; @@ -91,9 +91,9 @@ struct WildcardPartitionStrategy : IPartitionStrategy { WildcardPartitionStrategy(KeyDescription partition_key_description_, const Block & sample_block_, ContextPtr context_); - ColumnPtr computePartitionKey(const Chunk & chunk) override; + ColumnPtr computePartitionKey(const Chunk & chunk) const override; - ColumnPtr computePartitionKey(Block & block) override; + ColumnPtr computePartitionKey(Block & block) const override; }; /* @@ -110,9 +110,9 @@ struct HiveStylePartitionStrategy : IPartitionStrategy const std::string & file_format_, bool partition_columns_in_data_file_); - ColumnPtr computePartitionKey(const Chunk & chunk) override; + ColumnPtr computePartitionKey(const Chunk & chunk) const override; - ColumnPtr computePartitionKey(Block & block) override; + ColumnPtr computePartitionKey(Block & block) const override; ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) override; Block getFormatHeader() override; diff --git a/tests/integration/test_export_merge_tree_part_to_object_storage/test.py b/tests/integration/test_export_merge_tree_part_to_object_storage/test.py index 1539cb130598..8adec2908d1c 100644 --- a/tests/integration/test_export_merge_tree_part_to_object_storage/test.py +++ b/tests/integration/test_export_merge_tree_part_to_object_storage/test.py @@ -52,8 +52,10 @@ def test_drop_column_during_export_snapshot(cluster): skip_if_remote_database_disk_enabled(cluster) node = cluster.instances["node1"] - mt_table = "mutations_snapshot_mt_table" - s3_table = "mutations_snapshot_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + + mt_table = f"mutations_snapshot_mt_table_{postfix}" + s3_table = f"mutations_snapshot_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table) @@ -104,8 +106,10 @@ def test_add_column_during_export(cluster): skip_if_remote_database_disk_enabled(cluster) node = cluster.instances["node1"] - mt_table = "add_column_during_export_mt_table" - s3_table = "add_column_during_export_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + + mt_table = f"add_column_during_export_mt_table_{postfix}" + s3_table = f"add_column_during_export_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table) @@ -157,8 +161,10 @@ def test_pending_mutations_throw_before_export(cluster): """Test that pending mutations before export throw an error with default settings.""" node = cluster.instances["node1"] - mt_table = "pending_mutations_throw_mt_table" - s3_table = "pending_mutations_throw_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + + mt_table = f"pending_mutations_throw_mt_table_{postfix}" + s3_table = f"pending_mutations_throw_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table) @@ -180,8 +186,10 @@ def test_pending_mutations_skip_before_export(cluster): """Test that pending mutations before export are skipped with throw_on_pending_mutations=false.""" node = cluster.instances["node1"] - mt_table = "pending_mutations_skip_mt_table" - s3_table = "pending_mutations_skip_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + + mt_table = f"pending_mutations_skip_mt_table_{postfix}" + s3_table = f"pending_mutations_skip_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table) @@ -210,8 +218,10 @@ def test_data_mutations_after_export_started(cluster): skip_if_remote_database_disk_enabled(cluster) node = cluster.instances["node1"] - mt_table = "mutations_after_export_mt_table" - s3_table = "mutations_after_export_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + + mt_table = f"mutations_after_export_mt_table_{postfix}" + s3_table = f"mutations_after_export_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table) @@ -256,8 +266,10 @@ def test_pending_patch_parts_throw_before_export(cluster): """Test that pending patch parts before export throw an error with default settings.""" node = cluster.instances["node1"] - mt_table = "pending_patches_throw_mt_table" - s3_table = "pending_patches_throw_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + + mt_table = f"pending_patches_throw_mt_table_{postfix}" + s3_table = f"pending_patches_throw_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table) @@ -279,8 +291,10 @@ def test_pending_patch_parts_skip_before_export(cluster): """Test that pending patch parts before export are skipped with throw_on_pending_patch_parts=false.""" node = cluster.instances["node1"] - mt_table = "pending_patches_skip_mt_table" - s3_table = "pending_patches_skip_s3_table" + postfix = str(uuid.uuid4()).replace("-", "_") + + mt_table = f"pending_patches_skip_mt_table_{postfix}" + s3_table = f"pending_patches_skip_s3_table_{postfix}" create_tables_and_insert_data(node, mt_table, s3_table) diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.reference b/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.reference index ce4f112ad1fa..b7f1f4411bf6 100644 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.reference +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.reference @@ -1,6 +1,8 @@ ---- Test max_bytes and max_rows per file ---- Table function with schema inheritance (no schema specified) ---- Table function with explicit compatible schema +Waiting for exports to complete (timeout: 60s)... +All exports completed. ---- Count files in big_destination_max_bytes, should be 5 (4 parquet, 1 commit) 5 ---- Count rows in big_table and big_destination_max_bytes diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.sh b/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.sh index 449720bbf7a3..16c86569f82b 100755 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.sh +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.sh @@ -55,8 +55,35 @@ query "ALTER TABLE $mt_table_tf EXPORT PART '2022_1_1_0' TO TABLE FUNCTION s3(s3 echo "---- Table function with explicit compatible schema" query "ALTER TABLE $mt_table_tf EXPORT PART '2023_2_2_0' TO TABLE FUNCTION s3(s3_conn, filename='$tf_schema_explicit', format='Parquet', structure='id UInt64, value String, year UInt16', partition_strategy='hive') PARTITION BY year SETTINGS allow_experimental_export_merge_tree_part = 1" -# ONE BIG SLEEP after all exports (longer because it writes multiple files) -sleep 20 +# Wait for all exports to complete +wait_for_exports() { + local timeout=${1:-60} + local poll_interval=${2:-0.5} + local start_time=$(date +%s) + local elapsed=0 + + echo "Waiting for exports to complete (timeout: ${timeout}s)..." + + while [ $elapsed -lt $timeout ]; do + # Check if any exports are still in progress for our tables/parts + local active_exports=$(query "SELECT count() FROM system.exports WHERE (source_table = '$big_table' AND part_name IN ('$big_part_max_bytes', '$big_part_max_rows')) OR (source_table = '$mt_table_tf' AND part_name IN ('2022_1_1_0', '2023_2_2_0'))" | tr -d '\n') + + if [ "$active_exports" = "0" ]; then + echo "All exports completed." + return 0 + fi + + sleep $poll_interval + elapsed=$(($(date +%s) - start_time)) + done + + echo "Timeout waiting for exports to complete after ${timeout}s" + echo "Remaining exports:" + query "SELECT source_table, part_name, elapsed, rows_read, total_rows_to_read FROM system.exports WHERE (source_table = '$big_table' AND part_name IN ('$big_part_max_bytes', '$big_part_max_rows')) OR (source_table = '$mt_table_tf' AND part_name IN ('2022_1_1_0', '2023_2_2_0'))" + return 1 +} + +wait_for_exports 60 # ============================================================================ # ALL SELECTS/VERIFICATIONS HAPPEN HERE