Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ private static void validateFieldsPrefix(TableSchema schema, CoreOptions options
.forEach(
k -> {
if (k.startsWith(FIELDS_PREFIX)) {
String[] fields = k.split("\\.")[1].split(FIELDS_SEPARATOR);
String[] segments = k.split("\\.");
String[] fields = segments[1].split(FIELDS_SEPARATOR);
for (String field : fields) {
checkArgument(
DEFAULT_AGG_FUNCTION.equals(field)
Expand All @@ -496,6 +497,23 @@ private static void validateFieldsPrefix(TableSchema schema, CoreOptions options
"Field %s can not be found in table schema.",
field));
}
// PAIMON-6471: dot paths into a ROW field's members
// (e.g. fields.row.inner.aggregate-function) are silently
// dropped today and produce wrong results. This check
// rejects them.
if (segments.length > 3 && fields.length == 1) {
String parent = fields[0];
schema.fields().stream()
.filter(f -> f.name().equals(parent))
.findFirst()
.ifPresent(
f ->
checkArgument(
!(f.type() instanceof RowType),
"Nested-field path is not supported on ROW field '%s': %s",
parent,
k));
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,34 @@ public void testPartialUpdateTableAggregateFunctionWithoutSequenceGroup() {
assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException();
}

@Test
public void testNestedRowAggregateOptionRejected() {
// PAIMON-6471: nested-path agg config was silently ignored, leaving the parent ROW
// on the default aggregator and producing wrong results without any error.
List<DataField> fields =
Arrays.asList(
new DataField(0, "id", DataTypes.INT()),
new DataField(
1,
"data",
DataTypes.ROW(
DataTypes.FIELD(2, "num", DataTypes.INT()),
DataTypes.FIELD(3, "info", DataTypes.STRING()))));

Map<String, String> options = new HashMap<>();
options.put("merge-engine", "aggregation");
options.put("fields.data.num.aggregate-function", "sum");
options.put("fields.data.sequence-group", "data");
options.put(BUCKET.key(), "1");

TableSchema schema =
new TableSchema(1, fields, 10, emptyList(), singletonList("id"), options, "");

assertThatThrownBy(() -> validateTableSchema(schema))
.hasMessageContaining("Nested-field path is not supported on ROW field 'data'")
.hasMessageContaining("fields.data.num.aggregate-function");
}

@Test
public void testChainTableAllowsNonDeduplicateMergeEngine() {
Map<String, String> options = new HashMap<>();
Expand Down