From 3abe636caa800173babf4d2f1c7a624bec9f1b96 Mon Sep 17 00:00:00 2001 From: QuakeWang Date: Thu, 18 Jun 2026 00:00:39 +0800 Subject: [PATCH 1/3] [core] Fix nested update agg dropping existing-key updates at count limit When a nested-key was configured together with a count limit, `agg` returned the accumulator unchanged as soon as `acc.size() >= countLimit`. This early return was shared with the no-key path, so once the nested array reached the limit, updates to already-existing keys (and sequence-based updates) were silently dropped instead of being applied. Split the two paths: the no-key path keeps the count-limit truncation behavior, while the keyed path merges rows by key into a map. Existing keys are always updated (honoring the nested-sequence-field when set) and the count limit only caps the number of new keys, so in-place updates are no longer lost at the limit. Signed-off-by: QuakeWang --- .../aggregate/FieldNestedUpdateAgg.java | 60 +++++++++------- .../aggregate/FieldAggregatorTest.java | 69 +++++++++++++++++++ 2 files changed, 104 insertions(+), 25 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java index 13b19e6e9ba1..c7dd31481a6c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java @@ -123,35 +123,23 @@ public Object agg(Object accumulator, Object inputField) { InternalArray acc = (InternalArray) accumulator; InternalArray input = (InternalArray) inputField; - if (acc.size() >= countLimit) { - return accumulator; - } - - int remainCount = countLimit - acc.size(); - - List rows = new ArrayList<>(acc.size() + input.size()); - addNonNullRows(acc, rows); - addNonNullRows(input, rows, remainCount); - - if (keyProjection != null) { - Map map = new HashMap<>(); - for (InternalRow row : rows) { - BinaryRow key = keyProjection.apply(row).copy(); - if (hasSequenceField) { - // When sequence field is configured, only update if the new sequence is greater - InternalRow existing = map.get(key); - if (existing == null || compareSequence(row, existing) >= 0) { - map.put(key, row); - } - } else { - map.put(key, row); - } + if (keyProjection == null) { + if (acc.size() >= countLimit) { + return accumulator; } - rows = new ArrayList<>(map.values()); + int remainCount = countLimit - acc.size(); + + List rows = new ArrayList<>(acc.size() + input.size()); + addNonNullRows(acc, rows); + addNonNullRows(input, rows, remainCount); + return new GenericArray(rows.toArray()); } - return new GenericArray(rows.toArray()); + Map map = new HashMap<>(); + addNestedRows(acc, map, false); + addNestedRows(input, map, true); + return new GenericArray(new ArrayList<>(map.values()).toArray()); } @Override @@ -235,4 +223,26 @@ private void addNonNullRows(InternalArray array, List rows, int rem count++; } } + + private void addNestedRows( + InternalArray array, Map rows, boolean limitNewKeys) { + checkNotNull(keyProjection); + + for (int i = 0; i < array.size(); i++) { + if (array.isNullAt(i)) { + continue; + } + + InternalRow row = array.getRow(i, nestedFields); + BinaryRow key = keyProjection.apply(row).copy(); + InternalRow existing = rows.get(key); + if (existing != null) { + if (!hasSequenceField || compareSequence(row, existing) >= 0) { + rows.put(key, row); + } + } else if (!limitNewKeys || rows.size() < countLimit) { + rows.put(key, row); + } + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index 5496503b77be..cf6c0a9ee37e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -830,6 +830,39 @@ public void testFieldNestedAppendAggWithCountLimit() { .containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 1, "B"), row(0, 1, "b"))); } + @Test + public void testFieldNestedUpdateAggWithCountLimitUpdatesExistingKeyAtLimitWithoutSequence() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + 2); + + InternalArray accumulator = null; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B"))); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(1, 2, "C"))); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B_updated"))); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 1, "B_updated"), row(1, 2, "C"))); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(2, 3, "D"))); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 1, "B_updated"), row(1, 2, "C"))); + } + @Test public void testFieldNestedUpdateAggWithSequenceField() { DataType elementRowType = @@ -1076,6 +1109,42 @@ public void testFieldNestedUpdateAggWithCountLimitWithSequenceField() { Arrays.asList(row(0, 1, "B_updated", 2), row(1, 2, "C", 3))); } + @Test + public void testFieldNestedUpdateAggWithCountLimitUpdatesExistingKeyAtLimit() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + Collections.singletonList("seq"), + 2); + + InternalArray accumulator = null; + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B", 1))); + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(1, 2, "C", 3))); + + accumulator = + (InternalArray) agg.agg(accumulator, singletonArray(row(0, 1, "B_updated", 4))); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 1, "B_updated", 4), row(1, 2, "C", 3))); + + accumulator = (InternalArray) agg.agg(accumulator, singletonArray(row(2, 3, "D", 5))); + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 1, "B_updated", 4), row(1, 2, "C", 3))); + } + private List unnest(InternalArray array, InternalArray.ElementGetter elementGetter) { return IntStream.range(0, array.size()) .mapToObj(i -> elementGetter.getElementOrNull(array, i)) From 2bebe797881b0746bbfbaba80bba10bd2b9f88c3 Mon Sep 17 00:00:00 2001 From: QuakeWang Date: Fri, 19 Jun 2026 08:55:26 +0800 Subject: [PATCH 2/3] Fix nested update count limit on initial keyed input --- .../aggregate/FieldNestedUpdateAgg.java | 14 ++-- .../aggregate/FieldAggregatorTest.java | 70 +++++++++++++++++++ 2 files changed, 80 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java index c7dd31481a6c..f2b1efb58326 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java @@ -116,14 +116,18 @@ public FieldNestedUpdateAgg( @Override public Object agg(Object accumulator, Object inputField) { - if (accumulator == null || inputField == null) { - return accumulator == null ? inputField : accumulator; + if (inputField == null) { + return accumulator; } - InternalArray acc = (InternalArray) accumulator; InternalArray input = (InternalArray) inputField; if (keyProjection == null) { + if (accumulator == null) { + return inputField; + } + + InternalArray acc = (InternalArray) accumulator; if (acc.size() >= countLimit) { return accumulator; } @@ -137,7 +141,9 @@ public Object agg(Object accumulator, Object inputField) { } Map map = new HashMap<>(); - addNestedRows(acc, map, false); + if (accumulator != null) { + addNestedRows((InternalArray) accumulator, map, false); + } addNestedRows(input, map, true); return new GenericArray(new ArrayList<>(map.values()).toArray()); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index cf6c0a9ee37e..9f05f2ac74d4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -863,6 +863,38 @@ public void testFieldNestedUpdateAggWithCountLimitUpdatesExistingKeyAtLimitWitho Arrays.asList(row(0, 1, "B_updated"), row(1, 2, "C"))); } + @Test + public void testFieldNestedUpdateAggWithCountLimitOnFirstInputArrayWithoutSequence() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + 2); + + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + InternalArray accumulator = + (InternalArray) + agg.agg( + null, + array( + row(0, 1, "B"), + row(1, 2, "C"), + row(2, 3, "D"), + row(0, 1, "B_updated"))); + + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 1, "B_updated"), row(1, 2, "C"))); + } + @Test public void testFieldNestedUpdateAggWithSequenceField() { DataType elementRowType = @@ -1145,12 +1177,50 @@ public void testFieldNestedUpdateAggWithCountLimitUpdatesExistingKeyAtLimit() { Arrays.asList(row(0, 1, "B_updated", 4), row(1, 2, "C", 3))); } + @Test + public void testFieldNestedUpdateAggWithCountLimitOnFirstInputArrayWithSequence() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING()), + DataTypes.FIELD(3, "seq", DataTypes.INT())); + + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Arrays.asList("k0", "k1"), + Collections.singletonList("seq"), + 2); + + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + InternalArray accumulator = + (InternalArray) + agg.agg( + null, + array( + row(0, 1, "B", 1), + row(1, 2, "C", 3), + row(2, 3, "D", 5), + row(0, 1, "B_updated", 4))); + + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList(row(0, 1, "B_updated", 4), row(1, 2, "C", 3))); + } + private List unnest(InternalArray array, InternalArray.ElementGetter elementGetter) { return IntStream.range(0, array.size()) .mapToObj(i -> elementGetter.getElementOrNull(array, i)) .collect(Collectors.toList()); } + private GenericArray array(InternalRow... rows) { + return new GenericArray(rows); + } + private GenericArray singletonArray(InternalRow row) { return new GenericArray(new InternalRow[] {row}); } From b47eb699382e241479efabb13a8b6d8020231f99 Mon Sep 17 00:00:00 2001 From: QuakeWang Date: Sat, 20 Jun 2026 14:30:16 +0800 Subject: [PATCH 3/3] [core] Apply nested append count limit on initial input --- .../aggregate/FieldNestedUpdateAgg.java | 4 +++- .../aggregate/FieldAggregatorTest.java | 24 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java index f2b1efb58326..03c4c78500a0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java @@ -124,7 +124,9 @@ public Object agg(Object accumulator, Object inputField) { if (keyProjection == null) { if (accumulator == null) { - return inputField; + List rows = new ArrayList<>(input.size()); + addNonNullRows(input, rows, countLimit); + return new GenericArray(rows.toArray()); } InternalArray acc = (InternalArray) accumulator; diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index 9f05f2ac74d4..c791fd149cdb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -830,6 +830,30 @@ public void testFieldNestedAppendAggWithCountLimit() { .containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 1, "B"), row(0, 1, "b"))); } + @Test + public void testFieldNestedAppendAggWithCountLimitOnFirstInputArray() { + DataType elementRowType = + DataTypes.ROW( + DataTypes.FIELD(0, "k0", DataTypes.INT()), + DataTypes.FIELD(1, "k1", DataTypes.INT()), + DataTypes.FIELD(2, "v", DataTypes.STRING())); + FieldNestedUpdateAgg agg = + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Collections.emptyList(), + 2); + + InternalArray.ElementGetter elementGetter = + InternalArray.createElementGetter(elementRowType); + InternalArray accumulator = + (InternalArray) + agg.agg(null, array(row(0, 1, "B"), null, row(0, 1, "b"), row(0, 1, "C"))); + + assertThat(unnest(accumulator, elementGetter)) + .containsExactlyInAnyOrderElementsOf(Arrays.asList(row(0, 1, "B"), row(0, 1, "b"))); + } + @Test public void testFieldNestedUpdateAggWithCountLimitUpdatesExistingKeyAtLimitWithoutSequence() { DataType elementRowType =