Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 63 additions & 21 deletions src/Storages/IPartitionStrategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ extern const int BAD_ARGUMENTS;

namespace
{
using PartitionExpressionActionsAndColumnName = IPartitionStrategy::PartitionExpressionActionsAndColumnName;

/// Builds AST for hive partition path format
/// `partition_column_1=toString(partition_value_expr_1)/ ... /partition_column_N=toString(partition_value_expr_N)/`
/// for given partition columns list and a partition by AST.
Expand Down Expand Up @@ -89,6 +91,33 @@ namespace
return result;
}

ASTPtr buildToStringPartitionAST(ASTPtr partition_by)
{
ASTs arguments(1, partition_by);
return makeASTFunction("toString", std::move(arguments));
}

template <typename BuildAST>
PartitionExpressionActionsAndColumnName getCachedOrBuildActions(
const std::optional<PartitionExpressionActionsAndColumnName> & cached_result,
const IPartitionStrategy & partition_strategy,
BuildAST && build_ast)
{
if (cached_result)
return *cached_result;

auto expression_ast = build_ast();
return partition_strategy.getPartitionExpressionActions(expression_ast);
}

void cacheDeterministicActions(
std::optional<PartitionExpressionActionsAndColumnName> & cached_result,
const PartitionExpressionActionsAndColumnName & actions_with_column)
{
if (!actions_with_column.actions->getActionsDAG().hasNonDeterministic())
cached_result = actions_with_column;
}

std::shared_ptr<IPartitionStrategy> createHivePartitionStrategy(
ASTPtr partition_by,
const Block & sample_block,
Expand Down Expand Up @@ -191,11 +220,8 @@ const KeyDescription & IPartitionStrategy::getPartitionKeyDescription() const
}

IPartitionStrategy::PartitionExpressionActionsAndColumnName
IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast)
IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast) const
{
if (cached_result)
return *cached_result;

auto syntax_result = TreeRewriter(context).analyze(expression_ast, sample_block.getNamesAndTypesList());
auto actions_dag = ExpressionAnalyzer(expression_ast, syntax_result, context).getActionsDAG(false);

Expand All @@ -204,9 +230,6 @@ IPartitionStrategy::getPartitionExpressionActions(ASTPtr & expression_ast)
std::move(actions_dag), ExpressionActionsSettings(context), false);
result.column_name = expression_ast->getColumnName();

if (!result.actions->getActionsDAG().hasNonDeterministic())
cached_result = result;

return result;
}

Expand Down Expand Up @@ -259,13 +282,19 @@ std::shared_ptr<IPartitionStrategy> PartitionStrategyFactory::get(StrategyType s
WildcardPartitionStrategy::WildcardPartitionStrategy(KeyDescription partition_key_description_, const Block & sample_block_, ContextPtr context_)
: IPartitionStrategy(partition_key_description_, sample_block_, context_)
{
auto actions_with_column = getCachedOrBuildActions(
cached_result,
*this,
[&] { return buildToStringPartitionAST(partition_key_description.definition_ast); });
cacheDeterministicActions(cached_result, actions_with_column);
}

ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk)
ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk) const
{
ASTs arguments(1, partition_key_description.definition_ast);
ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments));
auto actions_with_column = getPartitionExpressionActions(partition_by_string);
auto actions_with_column = getCachedOrBuildActions(
cached_result,
*this,
[&] { return buildToStringPartitionAST(partition_key_description.definition_ast); });

Block block_with_partition_by_expr = sample_block.cloneWithoutColumns();
block_with_partition_by_expr.setColumns(chunk.getColumns());
Expand All @@ -274,11 +303,13 @@ ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk)
return block_with_partition_by_expr.getByName(actions_with_column.column_name).column;
}

ColumnPtr WildcardPartitionStrategy::computePartitionKey(Block & block)
ColumnPtr WildcardPartitionStrategy::computePartitionKey(Block & block) const
{
ASTs arguments(1, partition_key_description.definition_ast);
ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments));
auto actions_with_column = getPartitionExpressionActions(partition_by_string);
auto actions_with_column = getCachedOrBuildActions(
cached_result,
*this,
[&] { return buildToStringPartitionAST(partition_key_description.definition_ast); });

actions_with_column.actions->execute(block);
return block.getByName(actions_with_column.column_name).column;
}
Expand All @@ -300,12 +331,20 @@ HiveStylePartitionStrategy::HiveStylePartitionStrategy(
}

block_without_partition_columns = buildBlockWithoutPartitionColumns(sample_block, partition_columns_name_set);

auto actions_with_column = getCachedOrBuildActions(
cached_result,
*this,
[&] { return buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); });
cacheDeterministicActions(cached_result, actions_with_column);
}

ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk)
ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) const
{
auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns());
auto actions_with_column = getPartitionExpressionActions(hive_ast);
auto actions_with_column = getCachedOrBuildActions(
cached_result,
*this,
[&] { return buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); });

Block block_with_partition_by_expr = sample_block.cloneWithoutColumns();
block_with_partition_by_expr.setColumns(chunk.getColumns());
Expand All @@ -314,10 +353,13 @@ ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk)
return block_with_partition_by_expr.getByName(actions_with_column.column_name).column;
}

ColumnPtr HiveStylePartitionStrategy::computePartitionKey(Block & block)
ColumnPtr HiveStylePartitionStrategy::computePartitionKey(Block & block) const
{
auto hive_ast = buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns());
auto actions_with_column = getPartitionExpressionActions(hive_ast);
auto actions_with_column = getCachedOrBuildActions(
cached_result,
*this,
[&] { return buildHivePartitionAST(partition_key_description.definition_ast, getPartitionColumns()); });

actions_with_column.actions->execute(block);
return block.getByName(actions_with_column.column_name).column;
}
Expand Down
14 changes: 7 additions & 7 deletions src/Storages/IPartitionStrategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ struct IPartitionStrategy

virtual ~IPartitionStrategy() = default;

virtual ColumnPtr computePartitionKey(const Chunk & chunk) = 0;
virtual ColumnPtr computePartitionKey(const Chunk & chunk) const = 0;

virtual ColumnPtr computePartitionKey(Block & block) = 0;
virtual ColumnPtr computePartitionKey(Block & block) const = 0;

virtual ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk)
{
Expand All @@ -48,7 +48,7 @@ struct IPartitionStrategy
NamesAndTypesList getPartitionColumns() const;
const KeyDescription & getPartitionKeyDescription() const;

PartitionExpressionActionsAndColumnName getPartitionExpressionActions(ASTPtr & expression_ast);
PartitionExpressionActionsAndColumnName getPartitionExpressionActions(ASTPtr & expression_ast) const;

protected:
const KeyDescription partition_key_description;
Expand Down Expand Up @@ -91,9 +91,9 @@ struct WildcardPartitionStrategy : IPartitionStrategy
{
WildcardPartitionStrategy(KeyDescription partition_key_description_, const Block & sample_block_, ContextPtr context_);

ColumnPtr computePartitionKey(const Chunk & chunk) override;
ColumnPtr computePartitionKey(const Chunk & chunk) const override;

ColumnPtr computePartitionKey(Block & block) override;
ColumnPtr computePartitionKey(Block & block) const override;
};

/*
Expand All @@ -110,9 +110,9 @@ struct HiveStylePartitionStrategy : IPartitionStrategy
const std::string & file_format_,
bool partition_columns_in_data_file_);

ColumnPtr computePartitionKey(const Chunk & chunk) override;
ColumnPtr computePartitionKey(const Chunk & chunk) const override;

ColumnPtr computePartitionKey(Block & block) override;
ColumnPtr computePartitionKey(Block & block) const override;

ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) override;
Block getFormatHeader() override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ def test_drop_column_during_export_snapshot(cluster):
skip_if_remote_database_disk_enabled(cluster)
node = cluster.instances["node1"]

mt_table = "mutations_snapshot_mt_table"
s3_table = "mutations_snapshot_s3_table"
postfix = str(uuid.uuid4()).replace("-", "_")

mt_table = f"mutations_snapshot_mt_table_{postfix}"
s3_table = f"mutations_snapshot_s3_table_{postfix}"

create_tables_and_insert_data(node, mt_table, s3_table)

Expand Down Expand Up @@ -104,8 +106,10 @@ def test_add_column_during_export(cluster):
skip_if_remote_database_disk_enabled(cluster)
node = cluster.instances["node1"]

mt_table = "add_column_during_export_mt_table"
s3_table = "add_column_during_export_s3_table"
postfix = str(uuid.uuid4()).replace("-", "_")

mt_table = f"add_column_during_export_mt_table_{postfix}"
s3_table = f"add_column_during_export_s3_table_{postfix}"

create_tables_and_insert_data(node, mt_table, s3_table)

Expand Down Expand Up @@ -157,8 +161,10 @@ def test_pending_mutations_throw_before_export(cluster):
"""Test that pending mutations before export throw an error with default settings."""
node = cluster.instances["node1"]

mt_table = "pending_mutations_throw_mt_table"
s3_table = "pending_mutations_throw_s3_table"
postfix = str(uuid.uuid4()).replace("-", "_")

mt_table = f"pending_mutations_throw_mt_table_{postfix}"
s3_table = f"pending_mutations_throw_s3_table_{postfix}"

create_tables_and_insert_data(node, mt_table, s3_table)

Expand All @@ -180,8 +186,10 @@ def test_pending_mutations_skip_before_export(cluster):
"""Test that pending mutations before export are skipped with throw_on_pending_mutations=false."""
node = cluster.instances["node1"]

mt_table = "pending_mutations_skip_mt_table"
s3_table = "pending_mutations_skip_s3_table"
postfix = str(uuid.uuid4()).replace("-", "_")

mt_table = f"pending_mutations_skip_mt_table_{postfix}"
s3_table = f"pending_mutations_skip_s3_table_{postfix}"

create_tables_and_insert_data(node, mt_table, s3_table)

Expand Down Expand Up @@ -210,8 +218,10 @@ def test_data_mutations_after_export_started(cluster):
skip_if_remote_database_disk_enabled(cluster)
node = cluster.instances["node1"]

mt_table = "mutations_after_export_mt_table"
s3_table = "mutations_after_export_s3_table"
postfix = str(uuid.uuid4()).replace("-", "_")

mt_table = f"mutations_after_export_mt_table_{postfix}"
s3_table = f"mutations_after_export_s3_table_{postfix}"

create_tables_and_insert_data(node, mt_table, s3_table)

Expand Down Expand Up @@ -256,8 +266,10 @@ def test_pending_patch_parts_throw_before_export(cluster):
"""Test that pending patch parts before export throw an error with default settings."""
node = cluster.instances["node1"]

mt_table = "pending_patches_throw_mt_table"
s3_table = "pending_patches_throw_s3_table"
postfix = str(uuid.uuid4()).replace("-", "_")

mt_table = f"pending_patches_throw_mt_table_{postfix}"
s3_table = f"pending_patches_throw_s3_table_{postfix}"

create_tables_and_insert_data(node, mt_table, s3_table)

Expand All @@ -279,8 +291,10 @@ def test_pending_patch_parts_skip_before_export(cluster):
"""Test that pending patch parts before export are skipped with throw_on_pending_patch_parts=false."""
node = cluster.instances["node1"]

mt_table = "pending_patches_skip_mt_table"
s3_table = "pending_patches_skip_s3_table"
postfix = str(uuid.uuid4()).replace("-", "_")

mt_table = f"pending_patches_skip_mt_table_{postfix}"
s3_table = f"pending_patches_skip_s3_table_{postfix}"

create_tables_and_insert_data(node, mt_table, s3_table)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
---- Test max_bytes and max_rows per file
---- Table function with schema inheritance (no schema specified)
---- Table function with explicit compatible schema
Waiting for exports to complete (timeout: 60s)...
All exports completed.
---- Count files in big_destination_max_bytes, should be 5 (4 parquet, 1 commit)
5
---- Count rows in big_table and big_destination_max_bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,35 @@ query "ALTER TABLE $mt_table_tf EXPORT PART '2022_1_1_0' TO TABLE FUNCTION s3(s3
echo "---- Table function with explicit compatible schema"
query "ALTER TABLE $mt_table_tf EXPORT PART '2023_2_2_0' TO TABLE FUNCTION s3(s3_conn, filename='$tf_schema_explicit', format='Parquet', structure='id UInt64, value String, year UInt16', partition_strategy='hive') PARTITION BY year SETTINGS allow_experimental_export_merge_tree_part = 1"

# ONE BIG SLEEP after all exports (longer because it writes multiple files)
sleep 20
# Wait for all exports to complete
wait_for_exports() {
local timeout=${1:-60}
local poll_interval=${2:-0.5}
local start_time=$(date +%s)
local elapsed=0

echo "Waiting for exports to complete (timeout: ${timeout}s)..."

while [ $elapsed -lt $timeout ]; do
# Check if any exports are still in progress for our tables/parts
local active_exports=$(query "SELECT count() FROM system.exports WHERE (source_table = '$big_table' AND part_name IN ('$big_part_max_bytes', '$big_part_max_rows')) OR (source_table = '$mt_table_tf' AND part_name IN ('2022_1_1_0', '2023_2_2_0'))" | tr -d '\n')

if [ "$active_exports" = "0" ]; then
echo "All exports completed."
return 0
fi

sleep $poll_interval
elapsed=$(($(date +%s) - start_time))
done

echo "Timeout waiting for exports to complete after ${timeout}s"
echo "Remaining exports:"
query "SELECT source_table, part_name, elapsed, rows_read, total_rows_to_read FROM system.exports WHERE (source_table = '$big_table' AND part_name IN ('$big_part_max_bytes', '$big_part_max_rows')) OR (source_table = '$mt_table_tf' AND part_name IN ('2022_1_1_0', '2023_2_2_0'))"
return 1
}

wait_for_exports 60

# ============================================================================
# ALL SELECTS/VERIFICATIONS HAPPEN HERE
Expand Down
Loading