From 98e43d3e8207034ba177fb374ea67e5f20b32781 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 1 May 2026 11:30:40 +0800 Subject: [PATCH 1/3] [flink] Fix StackOverflowError in BTreeIndexTopoBuilder caused by chained union --- .../flink/btree/BTreeIndexTopoBuilder.java | 14 ++-- .../paimon/flink/BTreeGlobalIndexITCase.java | 64 +++++++++++++++++++ 2 files changed, 71 insertions(+), 7 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java index dbdfdc8fe06e..b03a6cdf7a2f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java @@ -85,7 +85,7 @@ public static boolean buildIndex( PartitionPredicate partitionPredicate, Options userOptions) throws Exception { - DataStream allCommitMessages = null; + List> allStreams = new ArrayList<>(); for (String indexColumn : indexColumns) { BTreeGlobalIndexBuilder indexBuilder = indexBuilderSupplier.get().withIndexField(indexColumn); @@ -160,15 +160,15 @@ public static boolean buildIndex( recordsPerRange, maxParallelism); - allCommitMessages = - allCommitMessages == null - ? commitMessages - : allCommitMessages.union(commitMessages); + allStreams.add(commitMessages); } } } - if (allCommitMessages != null) { - commit(table, allCommitMessages); + if (!allStreams.isEmpty()) { + @SuppressWarnings("unchecked") + DataStream[] rest = + allStreams.subList(1, allStreams.size()).toArray(new DataStream[0]); + commit(table, allStreams.get(0).union(rest)); } return true; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java index d6368da23f01..3a8835fe2d21 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java @@ -23,11 +23,14 @@ import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.table.FileStoreTable; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -140,4 +143,65 @@ private void buildBTreeIndexForTable(String tableName, String indexColumn) { "CALL sys.create_global_index(`table` => 'default.%s', index_column => '%s', index_type => 'btree')", tableName, indexColumn); } + + @Test + void testChainedUnionOverflowAndFlatUnionFix() throws InterruptedException { + int totalUnions = 8 * 200; // 8 index columns × 200 partitions + long stackSize = 512 * 1024; // Flink JM default + + // Chained union: result = result.union(new) — causes StackOverflowError + AtomicReference chainedError = new AtomicReference<>(); + Thread chainedThread = + new Thread( + null, + () -> { + try { + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream all = null; + for (int i = 0; i < totalUnions; i++) { + DataStream s = env.fromElements("item-" + i); + all = all == null ? s : all.union(s); + } + all.print(); + env.getExecutionPlan(); + } catch (Throwable t) { + chainedError.set(t); + } + }, + "chained-union-test", + stackSize); + chainedThread.start(); + chainedThread.join(); + assertThat(chainedError.get()).isInstanceOf(StackOverflowError.class); + + // Flat union: first.union(rest...) — no overflow at same stack size + AtomicReference flatError = new AtomicReference<>(); + Thread flatThread = + new Thread( + null, + () -> { + try { + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + @SuppressWarnings("unchecked") + DataStream[] streams = new DataStream[totalUnions]; + for (int i = 0; i < totalUnions; i++) { + streams[i] = env.fromElements("item-" + i); + } + @SuppressWarnings("unchecked") + DataStream[] rest = new DataStream[totalUnions - 1]; + System.arraycopy(streams, 1, rest, 0, totalUnions - 1); + streams[0].union(rest).print(); + env.getExecutionPlan(); + } catch (Throwable t) { + flatError.set(t); + } + }, + "flat-union-test", + stackSize); + flatThread.start(); + flatThread.join(); + assertThat(flatError.get()).isNull(); + } } From b4be437e010bc2628ee3acfbd45604a94d590df2 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 1 May 2026 14:31:00 +0800 Subject: [PATCH 2/3] optimize test case --- .../paimon/flink/BTreeGlobalIndexITCase.java | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java index 3a8835fe2d21..e2acaad2f3a7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java @@ -145,8 +145,36 @@ private void buildBTreeIndexForTable(String tableName, String indexColumn) { } @Test - void testChainedUnionOverflowAndFlatUnionFix() throws InterruptedException { - int totalUnions = 8 * 200; // 8 index columns × 200 partitions + public void testBTreeIndexWithManyPartitions() throws Catalog.TableNotExistException { + // Regression test: building a btree index on a table with many partitions + // previously caused StackOverflowError due to deeply nested DataStream.union() calls. + int numPartitions = 50; + sql( + "CREATE TABLE T_MANY_PT (pt INT, id INT, name STRING) PARTITIONED BY (pt) WITH (" + + "'global-index.enabled' = 'true', " + + "'row-tracking.enabled' = 'true', " + + "'data-evolution.enabled' = 'true'" + + ")"); + + for (int p = 0; p < numPartitions; p++) { + insertPartitionRows("T_MANY_PT", p, p * 2, 2, "r_"); + } + + buildBTreeIndexForTable("T_MANY_PT", "id"); + + FileStoreTable table = paimonTable("T_MANY_PT"); + long totalRowCount = + table.store().newIndexFileHandler().scanEntries().stream() + .filter(e -> "btree".equals(e.indexFile().indexType())) + .map(IndexManifestEntry::indexFile) + .mapToLong(IndexFileMeta::rowCount) + .sum(); + assertThat(totalRowCount).isEqualTo((long) numPartitions * 2); + } + + @Test + void testUnionDoesNotStackOverflow() throws InterruptedException { + int totalUnions = 1000; long stackSize = 512 * 1024; // Flink JM default // Chained union: result = result.union(new) — causes StackOverflowError From 1ef245ba3c838ef9bb22b3d8846f794d3b9cf080 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Fri, 1 May 2026 14:32:17 +0800 Subject: [PATCH 3/3] clean code --- .../java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java index e2acaad2f3a7..9aaa39e446f2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java @@ -145,9 +145,7 @@ private void buildBTreeIndexForTable(String tableName, String indexColumn) { } @Test - public void testBTreeIndexWithManyPartitions() throws Catalog.TableNotExistException { - // Regression test: building a btree index on a table with many partitions - // previously caused StackOverflowError due to deeply nested DataStream.union() calls. + void testBTreeIndexWithManyPartitions() throws Catalog.TableNotExistException { int numPartitions = 50; sql( "CREATE TABLE T_MANY_PT (pt INT, id INT, name STRING) PARTITIONED BY (pt) WITH ("