diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java new file mode 100644 index 0000000000000..de90dd2009eba --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.db.it; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.Statement; + +import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; +import static org.junit.Assert.fail; + +/** + * Integration tests for the LimitKRanking optimization. When using {@code ROW_NUMBER() OVER + * (PARTITION BY device ORDER BY time)}, the planner should produce a streaming LimitKRankingNode + * instead of the buffered TopKRankingNode, since data is already sorted by (device, time). + */ +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBLimitKRankingIT { + private static final String DATABASE_NAME = "test"; + private static final String[] sqls = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + "create table demo (device string tag, value double field)", + // d1: 4 rows + "insert into demo values (2021-01-01T09:05:00, 'd1', 3)", + "insert into demo values (2021-01-01T09:07:00, 'd1', 5)", + "insert into demo values (2021-01-01T09:09:00, 'd1', 3)", + "insert into demo values (2021-01-01T09:10:00, 'd1', 1)", + // d2: 2 rows + "insert into demo values (2021-01-01T09:08:00, 'd2', 2)", + "insert into demo values (2021-01-01T09:15:00, 'd2', 4)", + // d3: 5 rows + "insert into demo values (2021-01-01T09:01:00, 'd3', 10)", + "insert into demo values (2021-01-01T09:02:00, 'd3', 20)", + "insert into demo values (2021-01-01T09:03:00, 'd3', 30)", + "insert into demo values (2021-01-01T09:04:00, 'd3', 40)", + "insert into demo values (2021-01-01T09:06:00, 'd3', 50)", + "FLUSH", + "CLEAR ATTRIBUTE CACHE", + }; + + protected static void insertData() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + for (String sql : sqls) { + statement.execute(sql); + } + } catch (Exception e) { + fail("insertData failed."); + } + } + + @BeforeClass + public static void setUp() { + EnvFactory.getEnv().getConfig().getCommonConfig().setSortBufferSize(1024 * 1024); + EnvFactory.getEnv().initClusterEnvironment(); + insertData(); + } + + @AfterClass + public static void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testFilterPushDownOrderByTime() { + // ROW_NUMBER() OVER (PARTITION BY device ORDER BY time) WHERE rn <= 2 + // Should use LimitKRankingOperator: take first 2 time-ordered rows per device + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,1,", + "2021-01-01T09:07:00.000Z,d1,5.0,2,", + "2021-01-01T09:08:00.000Z,d2,2.0,1,", + "2021-01-01T09:15:00.000Z,d2,4.0,2,", + "2021-01-01T09:01:00.000Z,d3,10.0,1,", + "2021-01-01T09:02:00.000Z,d3,20.0,2,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) as rn FROM demo" + + ") WHERE rn <= 2 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeK1() { + // K=1: only the earliest row per device + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,1,", + "2021-01-01T09:08:00.000Z,d2,2.0,1,", + "2021-01-01T09:01:00.000Z,d3,10.0,1,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) as rn FROM demo" + + ") WHERE rn <= 1 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeKLargerThanData() { + // K=100: larger than any partition, so all rows are returned + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,1,", + "2021-01-01T09:07:00.000Z,d1,5.0,2,", + "2021-01-01T09:09:00.000Z,d1,3.0,3,", + "2021-01-01T09:10:00.000Z,d1,1.0,4,", + "2021-01-01T09:08:00.000Z,d2,2.0,1,", + "2021-01-01T09:15:00.000Z,d2,4.0,2,", + "2021-01-01T09:01:00.000Z,d3,10.0,1,", + "2021-01-01T09:02:00.000Z,d3,20.0,2,", + "2021-01-01T09:03:00.000Z,d3,30.0,3,", + "2021-01-01T09:04:00.000Z,d3,40.0,4,", + "2021-01-01T09:06:00.000Z,d3,50.0,5,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) as rn FROM demo" + + ") WHERE rn <= 100 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testLimitPushDownOrderByTime() { + // LIMIT pushdown: row_number() OVER (PARTITION BY device ORDER BY time) LIMIT 4 + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,1,", + "2021-01-01T09:07:00.000Z,d1,5.0,2,", + "2021-01-01T09:09:00.000Z,d1,3.0,3,", + "2021-01-01T09:10:00.000Z,d1,1.0,4,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) as rn FROM demo" + + ") ORDER BY device, time LIMIT 4", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeWithLessThan() { + // Use rn < 3 instead of rn <= 2 (should give same result as rn <= 2) + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,1,", + "2021-01-01T09:07:00.000Z,d1,5.0,2,", + "2021-01-01T09:08:00.000Z,d2,2.0,1,", + "2021-01-01T09:15:00.000Z,d2,4.0,2,", + "2021-01-01T09:01:00.000Z,d3,10.0,1,", + "2021-01-01T09:02:00.000Z,d3,20.0,2,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) as rn FROM demo" + + ") WHERE rn < 3 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeSelectSubsetColumns() { + // Only select time and device (not the ranking column) + String[] expectedHeader = new String[] {"time", "device", "value"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,", + "2021-01-01T09:07:00.000Z,d1,5.0,", + "2021-01-01T09:09:00.000Z,d1,3.0,", + "2021-01-01T09:08:00.000Z,d2,2.0,", + "2021-01-01T09:15:00.000Z,d2,4.0,", + "2021-01-01T09:01:00.000Z,d3,10.0,", + "2021-01-01T09:02:00.000Z,d3,20.0,", + "2021-01-01T09:03:00.000Z,d3,30.0,", + }; + tableResultSetEqualTest( + "SELECT time, device, value FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) as rn FROM demo" + + ") WHERE rn <= 3 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeEqual() { + // rn = 2: only the second row per device + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:07:00.000Z,d1,5.0,2,", + "2021-01-01T09:15:00.000Z,d2,4.0,2,", + "2021-01-01T09:02:00.000Z,d3,20.0,2,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) as rn FROM demo" + + ") WHERE rn = 2 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + // ==================== ORDER BY time DESC tests ==================== + + @Test + public void testFilterPushDownOrderByTimeDesc() { + // ROW_NUMBER() OVER (PARTITION BY device ORDER BY time DESC) WHERE rn <= 2 + // Should use LimitKRankingOperator: take latest 2 rows per device + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:10:00.000Z,d1,1.0,1,", + "2021-01-01T09:08:00.000Z,d2,2.0,2,", + "2021-01-01T09:15:00.000Z,d2,4.0,1,", + "2021-01-01T09:04:00.000Z,d3,40.0,2,", + "2021-01-01T09:06:00.000Z,d3,50.0,1,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo" + + ") WHERE rn <= 2 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeDescK1() { + // K=1: only the latest row per device + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:10:00.000Z,d1,1.0,1,", + "2021-01-01T09:15:00.000Z,d2,4.0,1,", + "2021-01-01T09:06:00.000Z,d3,50.0,1,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo" + + ") WHERE rn <= 1 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeDescKLargerThanData() { + // K=100: larger than any partition, so all rows are returned + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,4,", + "2021-01-01T09:07:00.000Z,d1,5.0,3,", + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:10:00.000Z,d1,1.0,1,", + "2021-01-01T09:08:00.000Z,d2,2.0,2,", + "2021-01-01T09:15:00.000Z,d2,4.0,1,", + "2021-01-01T09:01:00.000Z,d3,10.0,5,", + "2021-01-01T09:02:00.000Z,d3,20.0,4,", + "2021-01-01T09:03:00.000Z,d3,30.0,3,", + "2021-01-01T09:04:00.000Z,d3,40.0,2,", + "2021-01-01T09:06:00.000Z,d3,50.0,1,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo" + + ") WHERE rn <= 100 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testLimitPushDownOrderByTimeDesc() { + // LIMIT pushdown: row_number() OVER (PARTITION BY device ORDER BY time DESC) LIMIT 4 + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:10:00.000Z,d1,1.0,1,", + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:07:00.000Z,d1,5.0,3,", + "2021-01-01T09:05:00.000Z,d1,3.0,4,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo" + + ") ORDER BY device, time DESC LIMIT 4", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeDescWithLessThan() { + // Use rn < 3 instead of rn <= 2 (should give same result as rn <= 2) + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:10:00.000Z,d1,1.0,1,", + "2021-01-01T09:08:00.000Z,d2,2.0,2,", + "2021-01-01T09:15:00.000Z,d2,4.0,1,", + "2021-01-01T09:04:00.000Z,d3,40.0,2,", + "2021-01-01T09:06:00.000Z,d3,50.0,1,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo" + + ") WHERE rn < 3 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeDescSelectSubsetColumns() { + // Only select time, device, value (not the ranking column) + String[] expectedHeader = new String[] {"time", "device", "value"}; + String[] retArray = + new String[] { + "2021-01-01T09:07:00.000Z,d1,5.0,", + "2021-01-01T09:09:00.000Z,d1,3.0,", + "2021-01-01T09:10:00.000Z,d1,1.0,", + "2021-01-01T09:08:00.000Z,d2,2.0,", + "2021-01-01T09:15:00.000Z,d2,4.0,", + "2021-01-01T09:03:00.000Z,d3,30.0,", + "2021-01-01T09:04:00.000Z,d3,40.0,", + "2021-01-01T09:06:00.000Z,d3,50.0,", + }; + tableResultSetEqualTest( + "SELECT time, device, value FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo" + + ") WHERE rn <= 3 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testFilterPushDownOrderByTimeDescEqual() { + // rn = 2: only the second-latest row per device + String[] expectedHeader = new String[] {"time", "device", "value", "rn"}; + String[] retArray = + new String[] { + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:08:00.000Z,d2,2.0,2,", + "2021-01-01T09:04:00.000Z,d3,40.0,2,", + }; + tableResultSetEqualTest( + "SELECT * FROM (" + + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo" + + ") WHERE rn = 2 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperator.java new file mode 100644 index 0000000000000..11f75ab9ddee4 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperator.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash; +import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.read.common.type.Type; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash.createGroupByHash; + +/** + * Streaming operator optimized for {@code PARTITION BY device ORDER BY time} with a top-K filter. + * + *

Unlike {@link TopKRankingOperator} which buffers all input and uses heap-based selection, this + * operator exploits the fact that input data is already sorted by (device, time). It simply counts + * rows per partition and emits the first K rows for each, discarding the rest. This yields O(N) + * time with minimal memory overhead, compared to O(N log K) for the general TopK path. + */ +public class LimitKRankingOperator implements ProcessOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(LimitKRankingOperator.class); + + private final OperatorContext operatorContext; + private final Operator inputOperator; + private final List inputDataTypes; + + private final List outputChannels; + private final List partitionChannels; + private final int maxRowCountPerPartition; + private final boolean produceRowNumber; + + private final Optional groupByHash; + private final Map partitionRowCounts; + private final TsBlockBuilder tsBlockBuilder; + + public LimitKRankingOperator( + OperatorContext operatorContext, + Operator inputOperator, + List inputDataTypes, + List outputChannels, + List partitionChannels, + List partitionTSDataTypes, + int maxRowCountPerPartition, + boolean produceRowNumber, + int expectedPositions) { + this.operatorContext = operatorContext; + this.inputOperator = inputOperator; + this.inputDataTypes = inputDataTypes; + this.outputChannels = ImmutableList.copyOf(outputChannels); + this.partitionChannels = ImmutableList.copyOf(partitionChannels); + this.maxRowCountPerPartition = maxRowCountPerPartition; + this.produceRowNumber = produceRowNumber; + + List outputDataTypes = new ArrayList<>(); + for (int channel : outputChannels) { + outputDataTypes.add(inputDataTypes.get(channel)); + } + if (produceRowNumber) { + outputDataTypes.add(TSDataType.INT64); + } + this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes); + + if (partitionChannels.isEmpty()) { + this.groupByHash = Optional.empty(); + } else { + List partitionTypes = new ArrayList<>(partitionTSDataTypes.size()); + for (TSDataType tsDataType : partitionTSDataTypes) { + partitionTypes.add(InternalTypeManager.fromTSDataType(tsDataType)); + } + this.groupByHash = + Optional.of( + createGroupByHash(partitionTypes, false, expectedPositions, UpdateMemory.NOOP)); + } + + this.partitionRowCounts = new HashMap<>(expectedPositions); + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() throws Exception { + TsBlock input = inputOperator.nextWithTimer(); + if (input == null) { + return null; + } + + int positionCount = input.getPositionCount(); + int[] partitionIds = getPartitionIds(input); + + for (int position = 0; position < positionCount; position++) { + int partitionId = groupByHash.isPresent() ? partitionIds[position] : 0; + long rowCount = partitionRowCounts.getOrDefault(partitionId, 0L); + + if (rowCount < maxRowCountPerPartition) { + emitRow(input, position, rowCount + 1); + } + + partitionRowCounts.put(partitionId, rowCount + 1); + } + + if (tsBlockBuilder.getPositionCount() == 0) { + tsBlockBuilder.reset(); + return null; + } + + TsBlock result = + tsBlockBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount())); + tsBlockBuilder.reset(); + return result; + } + + private void emitRow(TsBlock tsBlock, int position, long rowNumber) { + for (int i = 0; i < outputChannels.size(); i++) { + Column column = tsBlock.getColumn(outputChannels.get(i)); + ColumnBuilder columnBuilder = tsBlockBuilder.getColumnBuilder(i); + if (column.isNull(position)) { + columnBuilder.appendNull(); + } else { + columnBuilder.write(column, position); + } + } + + if (produceRowNumber) { + tsBlockBuilder.getColumnBuilder(outputChannels.size()).writeLong(rowNumber); + } + + tsBlockBuilder.declarePosition(); + } + + private int[] getPartitionIds(TsBlock tsBlock) { + if (groupByHash.isPresent()) { + Column[] partitionColumns = new Column[partitionChannels.size()]; + for (int i = 0; i < partitionChannels.size(); i++) { + partitionColumns[i] = tsBlock.getColumn(partitionChannels.get(i)); + } + return groupByHash.get().getGroupIds(partitionColumns); + } + return new int[0]; + } + + @Override + public boolean hasNext() throws Exception { + return inputOperator.hasNextWithTimer(); + } + + @Override + public void close() throws Exception { + inputOperator.close(); + } + + @Override + public boolean isFinished() throws Exception { + return !hasNext(); + } + + @Override + public ListenableFuture isBlocked() { + return inputOperator.isBlocked(); + } + + @Override + public long calculateMaxPeekMemory() { + long maxPeekMemoryFromInput = inputOperator.calculateMaxPeekMemoryWithCounter(); + long maxTsBlockSize = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + return Math.max(maxPeekMemoryFromInput, maxTsBlockSize) + + inputOperator.calculateRetainedSizeAfterCallingNext(); + } + + @Override + public long calculateMaxReturnSize() { + return TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return inputOperator.calculateRetainedSizeAfterCallingNext(); + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + + tsBlockBuilder.getRetainedSizeInBytes(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 1bc788251a764..cfcfb4a99f469 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -101,6 +101,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.IrRowPatternToProgramRewriter; import org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.Matcher; import org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.Program; +import org.apache.iotdb.db.queryengine.execution.operator.process.window.LimitKRankingOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.window.RowNumberOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.window.TableWindowOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.window.TopKRankingOperator; @@ -198,6 +199,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; @@ -4372,4 +4374,46 @@ public Operator visitTopKRanking(TopKRankingNode node, LocalExecutionPlanContext 1000, Optional.empty()); } + + @Override + public Operator visitLimitKRanking(LimitKRankingNode node, LocalExecutionPlanContext context) { + Operator child = node.getChild().accept(this, context); + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + LimitKRankingOperator.class.getSimpleName()); + + List partitionBySymbols = node.getSpecification().getPartitionBy(); + Map childLayout = + makeLayoutFromOutputSymbols(node.getChild().getOutputSymbols()); + List partitionChannels = getChannelsForSymbols(partitionBySymbols, childLayout); + List inputDataTypes = + getOutputColumnTypes(node.getChild(), context.getTypeProvider()); + List partitionTypes = + partitionChannels.stream().map(inputDataTypes::get).collect(toImmutableList()); + + ImmutableList.Builder outputChannels = ImmutableList.builder(); + for (int i = 0; i < inputDataTypes.size(); i++) { + outputChannels.add(i); + } + + ImmutableMap.Builder outputMappings = ImmutableMap.builder(); + outputMappings.putAll(childLayout); + int channel = inputDataTypes.size(); + outputMappings.put(node.getRankingSymbol(), channel); + + return new LimitKRankingOperator( + operatorContext, + child, + inputDataTypes, + outputChannels.build(), + partitionChannels, + partitionTypes, + node.getMaxRowCountPerPartition(), + true, + 1000); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 180241b4192cd..ae24abe803d55 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -79,6 +79,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; @@ -1155,6 +1156,24 @@ public List visitTopKRanking(TopKRankingNode node, GraphContext context) return render(node, boxValue, context); } + @Override + public List visitLimitKRanking(LimitKRankingNode node, GraphContext context) { + List boxValue = new ArrayList<>(); + + boxValue.add(String.format("LimitKRanking-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("RankingSymbol: %s", node.getRankingSymbol())); + boxValue.add(String.format("MaxRowCount: %d", node.getMaxRowCountPerPartition())); + DataOrganizationSpecification specification = node.getSpecification(); + if (!specification.getPartitionBy().isEmpty()) { + boxValue.add("Partition by: [" + Joiner.on(", ").join(specification.getPartitionBy()) + "]"); + } + specification + .getOrderingScheme() + .ifPresent(orderingScheme -> boxValue.add("Order by: " + orderingScheme)); + + return render(node, boxValue, context); + } + private String printRegion(TRegionReplicaSet regionReplicaSet) { return String.format( "Partition: %s", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 55aaefe8be618..c9c118e6a3cd8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -125,6 +125,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; @@ -324,6 +325,8 @@ public enum PlanNodeType { TABLE_ROW_NUMBER_NODE((short) 1038), TABLE_VALUES_NODE((short) 1039), + TABLE_LIMITK_RANKING_NODE((short) 1040), + RELATIONAL_INSERT_TABLET((short) 2000), RELATIONAL_INSERT_ROW((short) 2001), RELATIONAL_INSERT_ROWS((short) 2002), @@ -727,6 +730,8 @@ public static PlanNode deserialize(ByteBuffer buffer, short nodeType) { return RowNumberNode.deserialize(buffer); case 1039: return ValuesNode.deserialize(buffer); + case 1040: + return LimitKRankingNode.deserialize(buffer); case 2000: return RelationalInsertTabletNode.deserialize(buffer); case 2001: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 44f1cd8bc1f67..d1a9958dea52a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -795,6 +795,12 @@ public R visitTopKRanking( return visitSingleChildProcess(node, context); } + public R visitLimitKRanking( + org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode node, + C context) { + return visitSingleChildProcess(node, context); + } + public R visitRowNumber(RowNumberNode node, C context) { return visitSingleChildProcess(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 7072b5f519f73..9defd43755bfd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -69,6 +69,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; @@ -1914,6 +1915,36 @@ public List visitTopKRanking(TopKRankingNode node, PlanContext context } } + @Override + public List visitLimitKRanking(LimitKRankingNode node, PlanContext context) { + Optional orderingScheme = node.getSpecification().getOrderingScheme(); + if (orderingScheme.isPresent()) { + context.setExpectedOrderingScheme(orderingScheme.get()); + nodeOrderingMap.put(node.getPlanNodeId(), orderingScheme.get()); + } + + checkArgument( + node.getChildren().size() == 1, "Size of LimitKRankingNode can only be 1 in logical plan."); + boolean canSplitPushDown = node.getChild() instanceof GroupNode; + if (!canSplitPushDown) { + node.setChild(((SortNode) node.getChild()).getChild()); + } + List childrenNodes = node.getChildren().get(0).accept(this, context); + + if (childrenNodes.size() == 1) { + node.setChild(childrenNodes.get(0)); + return Collections.singletonList(node); + } else if (!canSplitPushDown) { + CollectNode collectNode = + new CollectNode(queryId.genPlanNodeId(), node.getChildren().get(0).getOutputSymbols()); + childrenNodes.forEach(collectNode::addChild); + node.setChild(collectNode); + return Collections.singletonList(node); + } else { + return splitForEachChild(node, childrenNodes); + } + } + @Override public List visitUnion(UnionNode node, PlanContext context) { context.clearExpectedOrderingScheme(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java index d7ecab9cabfe4..c1c5e7734ce35 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java @@ -19,10 +19,12 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValuesNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; @@ -42,6 +44,7 @@ import static com.google.common.collect.Iterables.getOnlyElement; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.canUseLimitKRanking; import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.toTopNRankingType; import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.filter; import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source; @@ -90,15 +93,26 @@ public Result apply(FilterNode node, Captures captures, Context context) { new ValuesNode(node.getPlanNodeId(), node.getOutputSymbols(), ImmutableList.of())); } - TopKRankingNode newSource = - new TopKRankingNode( - windowNode.getPlanNodeId(), - windowNode.getChild(), - windowNode.getSpecification(), - rankingType.get(), - rankingSymbol, - upperBound.getAsInt(), - false); + PlanNode newSource; + if (canUseLimitKRanking(windowNode, rankingType.get(), context.getSymbolAllocator())) { + newSource = + new LimitKRankingNode( + windowNode.getPlanNodeId(), + windowNode.getChild(), + windowNode.getSpecification(), + rankingSymbol, + upperBound.getAsInt()); + } else { + newSource = + new TopKRankingNode( + windowNode.getPlanNodeId(), + windowNode.getChild(), + windowNode.getSpecification(), + rankingType.get(), + rankingSymbol, + upperBound.getAsInt(), + false); + } if (needToKeepFilter(node.getPredicate(), rankingSymbol, upperBound.getAsInt())) { return Result.ofPlanNode( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java index e3ce8619c5beb..b8f2f415d0a55 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java @@ -20,7 +20,9 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; @@ -34,6 +36,7 @@ import static com.google.common.collect.Iterables.getOnlyElement; import static java.lang.Math.toIntExact; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.canUseLimitKRanking; import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.toTopNRankingType; import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChildReplacer.replaceChildren; import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.limit; @@ -83,18 +86,31 @@ public Result apply(LimitNode node, Captures captures, Context context) { Optional rankingType = toTopNRankingType(source); int limit = toIntExact(node.getCount()); - TopKRankingNode topNRowNumberNode = - new TopKRankingNode( - source.getPlanNodeId(), - source.getChild(), - source.getSpecification(), - rankingType.get(), - getOnlyElement(source.getWindowFunctions().keySet()), - limit, - false); + + PlanNode rankingNode; + if (canUseLimitKRanking(source, rankingType.get(), context.getSymbolAllocator())) { + rankingNode = + new LimitKRankingNode( + source.getPlanNodeId(), + source.getChild(), + source.getSpecification(), + getOnlyElement(source.getWindowFunctions().keySet()), + limit); + } else { + rankingNode = + new TopKRankingNode( + source.getPlanNodeId(), + source.getChild(), + source.getSpecification(), + rankingType.get(), + getOnlyElement(source.getWindowFunctions().keySet()), + limit, + false); + } + if (rankingType.get() == ROW_NUMBER && source.getSpecification().getPartitionBy().isEmpty()) { - return Result.ofPlanNode(topNRowNumberNode); + return Result.ofPlanNode(rankingNode); } - return Result.ofPlanNode(replaceChildren(node, ImmutableList.of(topNRowNumberNode))); + return Result.ofPlanNode(replaceChildren(node, ImmutableList.of(rankingNode))); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java index 5845ffd422105..a96681821d164 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java @@ -22,7 +22,9 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature; import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; +import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode; @@ -32,6 +34,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import org.apache.tsfile.read.common.type.TimestampType; +import org.apache.tsfile.read.common.type.Type; import java.util.Collection; import java.util.List; @@ -142,4 +146,34 @@ public static Optional toTopNRankingType(WindowNode } return Optional.empty(); } + + /** + * Returns true when the window is ROW_NUMBER with ORDER BY on a single TIMESTAMP column (ASC or + * DESC). In IoTDB, data is naturally sorted by (device, time) and table scans support both + * forward and reverse iteration, so we can use the streaming LimitKRankingNode instead of the + * buffered TopKRankingNode for both ASC (earliest K) and DESC (latest K). + */ + public static boolean canUseLimitKRanking( + WindowNode windowNode, + TopKRankingNode.RankingType rankingType, + SymbolAllocator symbolAllocator) { + if (rankingType != ROW_NUMBER && rankingType != RANK) { + return false; + } + + Optional orderingScheme = windowNode.getSpecification().getOrderingScheme(); + if (!orderingScheme.isPresent()) { + return false; + } + + List orderBy = orderingScheme.get().getOrderBy(); + if (orderBy.size() != 1) { + return false; + } + + Symbol orderSymbol = orderBy.get(0); + + Type type = symbolAllocator.getTypes().getTableModelType(orderSymbol); + return type instanceof TimestampType; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitKRankingNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitKRankingNode.java new file mode 100644 index 0000000000000..a78c9f9bbdafc --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitKRankingNode.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * Plan node for the streaming per-partition limit optimization. Used when the input data is already + * sorted by (partition_keys, time), so we can take the first K rows per partition without sorting. + */ +public class LimitKRankingNode extends SingleChildProcessNode { + + private final DataOrganizationSpecification specification; + private final Symbol rankingSymbol; + private final int maxRowCountPerPartition; + + public LimitKRankingNode( + PlanNodeId id, + DataOrganizationSpecification specification, + Symbol rankingSymbol, + int maxRowCountPerPartition) { + super(id); + this.specification = specification; + this.rankingSymbol = rankingSymbol; + this.maxRowCountPerPartition = maxRowCountPerPartition; + } + + public LimitKRankingNode( + PlanNodeId id, + PlanNode child, + DataOrganizationSpecification specification, + Symbol rankingSymbol, + int maxRowCountPerPartition) { + super(id, child); + this.specification = specification; + this.rankingSymbol = rankingSymbol; + this.maxRowCountPerPartition = maxRowCountPerPartition; + } + + @Override + public PlanNode clone() { + return new LimitKRankingNode( + getPlanNodeId(), specification, rankingSymbol, maxRowCountPerPartition); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitLimitKRanking(this, context); + } + + public DataOrganizationSpecification getSpecification() { + return specification; + } + + public Symbol getRankingSymbol() { + return rankingSymbol; + } + + public int getMaxRowCountPerPartition() { + return maxRowCountPerPartition; + } + + @Override + public List getOutputColumnNames() { + throw new UnsupportedOperationException(); + } + + @Override + public List getOutputSymbols() { + return ImmutableList.builder() + .addAll(getChild().getOutputSymbols()) + .add(rankingSymbol) + .build(); + } + + @Override + public PlanNode replaceChildren(List newChildren) { + return new LimitKRankingNode( + id, + Iterables.getOnlyElement(newChildren), + specification, + rankingSymbol, + maxRowCountPerPartition); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_LIMITK_RANKING_NODE.serialize(byteBuffer); + specification.serialize(byteBuffer); + Symbol.serialize(rankingSymbol, byteBuffer); + ReadWriteIOUtils.write(maxRowCountPerPartition, byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_LIMITK_RANKING_NODE.serialize(stream); + specification.serialize(stream); + Symbol.serialize(rankingSymbol, stream); + ReadWriteIOUtils.write(maxRowCountPerPartition, stream); + } + + public static LimitKRankingNode deserialize(ByteBuffer byteBuffer) { + DataOrganizationSpecification specification = + DataOrganizationSpecification.deserialize(byteBuffer); + Symbol rankingSymbol = Symbol.deserialize(byteBuffer); + int maxRowCountPerPartition = ReadWriteIOUtils.readInt(byteBuffer); + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new LimitKRankingNode(planNodeId, specification, rankingSymbol, maxRowCountPerPartition); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + LimitKRankingNode that = (LimitKRankingNode) o; + return Objects.equal(specification, that.specification) + && Objects.equal(rankingSymbol, that.rankingSymbol) + && Objects.equal(maxRowCountPerPartition, that.maxRowCountPerPartition); + } + + @Override + public int hashCode() { + return Objects.hashCode( + super.hashCode(), specification, rankingSymbol, maxRowCountPerPartition); + } + + @Override + public String toString() { + return "LimitKRankingNode-" + this.getPlanNodeId(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java index bb276f07150b9..bdc986e63da41 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; +import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FillNode; @@ -31,6 +32,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import java.util.Collections; @@ -72,10 +74,13 @@ public PlanNode visitSort(SortNode node, Context context) { OrderingScheme orderingScheme = node.getOrderingScheme(); if (context.canEliminateSort() && newContext.getTotalDeviceEntrySize() == 1 - && orderingScheme.getOrderBy().get(0).getName().equals(context.getTimeColumnName())) { + && orderingScheme.getOrderBy().get(0).getName().equals(context.getTimeColumnName()) + && timeSortDirectionMatchesScanOrder(orderingScheme, newContext)) { return child; } - return context.canEliminateSort() && node.isOrderByAllIdsAndTime() + return context.canEliminateSort() + && node.isOrderByAllIdsAndTime() + && timeSortDirectionMatchesScanOrder(orderingScheme, newContext) ? child : node.replaceChildren(Collections.singletonList(child)); } @@ -86,17 +91,39 @@ public PlanNode visitStreamSort(StreamSortNode node, Context context) { PlanNode child = node.getChild().accept(this, newContext); context.setCannotEliminateSort(newContext.cannotEliminateSort); return context.canEliminateSort() - && (node.isOrderByAllIdsAndTime() + && ((node.isOrderByAllIdsAndTime() + && timeSortDirectionMatchesScanOrder(node.getOrderingScheme(), newContext)) || node.getStreamCompareKeyEndIndex() == node.getOrderingScheme().getOrderBy().size() - 1) ? child : node.replaceChildren(Collections.singletonList(child)); } + /** + * Checks whether the sort direction on the time column matches the underlying scan order. When + * the scan is DESC (e.g. due to LimitKRanking with ORDER BY time DESC), a sort expecting ASC + * must not be eliminated. + */ + private boolean timeSortDirectionMatchesScanOrder( + OrderingScheme orderingScheme, Context childContext) { + if (childContext.getTimeColumnName() == null) { + return true; + } + for (Symbol symbol : orderingScheme.getOrderBy()) { + if (symbol.getName().equals(childContext.getTimeColumnName())) { + SortOrder sortOrder = orderingScheme.getOrdering(symbol); + boolean scanAscending = childContext.getScanOrder() == Ordering.ASC; + return sortOrder.isAscending() == scanAscending; + } + } + return true; + } + @Override public PlanNode visitDeviceTableScan(DeviceTableScanNode node, Context context) { context.addDeviceEntrySize(node.getDeviceEntries().size()); context.setTimeColumnName(node.getTimeColumn().map(Symbol::getName).orElse(null)); + context.setScanOrder(node.getScanOrder()); return node; } @@ -152,6 +179,8 @@ private static class Context { private String timeColumnName = null; + private Ordering scanOrder = Ordering.ASC; + Context() {} public void addDeviceEntrySize(int deviceEntrySize) { @@ -177,5 +206,13 @@ public String getTimeColumnName() { public void setTimeColumnName(String timeColumnName) { this.timeColumnName = timeColumnName; } + + public Ordering getScanOrder() { + return scanOrder; + } + + public void setScanOrder(Ordering scanOrder) { + this.scanOrder = scanOrder; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java index 002e59f124c9f..e03140fb42aa6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ExpressionTreeRewriter; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Measure; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; @@ -300,6 +301,15 @@ public TopKRankingNode map(TopKRankingNode node, PlanNode source) { node.isPartial()); } + public LimitKRankingNode map(LimitKRankingNode node, PlanNode source) { + return new LimitKRankingNode( + node.getPlanNodeId(), + source, + mapAndDistinct(node.getSpecification()), + map(node.getRankingSymbol()), + node.getMaxRowCountPerPartition()); + } + public RowNumberNode map(RowNumberNode node, PlanNode source) { return new RowNumberNode( node.getPlanNodeId(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index b7f2e9eaaa851..540e20d043651 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -46,6 +46,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; @@ -650,6 +651,17 @@ public PlanAndMappings visitTopKRanking(TopKRankingNode node, UnaliasContext con return new PlanAndMappings(rewrittenTopKRanking, mapping); } + @Override + public PlanAndMappings visitLimitKRanking(LimitKRankingNode node, UnaliasContext context) { + PlanAndMappings rewrittenSource = node.getChild().accept(this, context); + Map mapping = new HashMap<>(rewrittenSource.getMappings()); + SymbolMapper mapper = symbolMapper(mapping); + + LimitKRankingNode rewrittenNode = mapper.map(node, rewrittenSource.getRoot()); + + return new PlanAndMappings(rewrittenNode, mapping); + } + @Override public PlanAndMappings visitOutput(OutputNode node, UnaliasContext context) { PlanAndMappings rewrittenSource = node.getChild().accept(this, context); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperatorTest.java new file mode 100644 index 0000000000000..9acae751378b3 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperatorTest.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.utils.Binary; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class LimitKRankingOperatorTest { + private static final ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "limitKRanking-test-instance-notification"); + + private static final List INPUT_DATA_TYPES = + Arrays.asList(TSDataType.TIMESTAMP, TSDataType.TEXT, TSDataType.INT32); + + @Test + public void testSingleBlockMultiPartition() { + // Input: d1 has 3 rows, d2 has 3 rows; K=2 → keep first 2 per partition + long[][] timeArray = {{1, 2, 3, 4, 5, 6}}; + String[][] deviceArray = {{"d1", "d1", "d1", "d2", "d2", "d2"}}; + int[][] valueArray = {{10, 20, 30, 40, 50, 60}}; + + long[] expectTime = {1, 2, 4, 5}; + String[] expectDevice = {"d1", "d1", "d2", "d2"}; + int[] expectValue = {10, 20, 40, 50}; + + verifyOperatorOutput( + timeArray, deviceArray, valueArray, 2, false, expectTime, expectDevice, expectValue, null); + } + + @Test + public void testPartitionCrossMultiBlocks() { + // d1 spans block0 and block1; d2 spans block1 and block2; K=3 + long[][] timeArray = {{1, 2}, {3, 4, 5}, {6, 7, 8}}; + String[][] deviceArray = {{"d1", "d1"}, {"d1", "d1", "d2"}, {"d2", "d2", "d2"}}; + int[][] valueArray = {{10, 20}, {30, 40, 50}, {60, 70, 80}}; + + long[] expectTime = {1, 2, 3, 5, 6, 7}; + String[] expectDevice = {"d1", "d1", "d1", "d2", "d2", "d2"}; + int[] expectValue = {10, 20, 30, 50, 60, 70}; + + verifyOperatorOutput( + timeArray, deviceArray, valueArray, 3, false, expectTime, expectDevice, expectValue, null); + } + + @Test + public void testWithRowNumber() { + long[][] timeArray = {{1, 2, 3, 4}}; + String[][] deviceArray = {{"d1", "d1", "d2", "d2"}}; + int[][] valueArray = {{10, 20, 30, 40}}; + + long[] expectTime = {1, 2, 3, 4}; + String[] expectDevice = {"d1", "d1", "d2", "d2"}; + int[] expectValue = {10, 20, 30, 40}; + long[] expectRowNumber = {1, 2, 1, 2}; + + verifyOperatorOutput( + timeArray, + deviceArray, + valueArray, + 2, + true, + expectTime, + expectDevice, + expectValue, + expectRowNumber); + } + + @Test + public void testKLargerThanData() { + // K=10, but only 2 rows per partition → all rows emitted + long[][] timeArray = {{1, 2, 3, 4}}; + String[][] deviceArray = {{"d1", "d1", "d2", "d2"}}; + int[][] valueArray = {{10, 20, 30, 40}}; + + long[] expectTime = {1, 2, 3, 4}; + String[] expectDevice = {"d1", "d1", "d2", "d2"}; + int[] expectValue = {10, 20, 30, 40}; + + verifyOperatorOutput( + timeArray, deviceArray, valueArray, 10, false, expectTime, expectDevice, expectValue, null); + } + + @Test + public void testEntireBlockFiltered() { + // K=2, d1 gets 2 rows in block0 → block1 (all d1) should be fully filtered + long[][] timeArray = {{1, 2}, {3, 4}}; + String[][] deviceArray = {{"d1", "d1"}, {"d1", "d1"}}; + int[][] valueArray = {{10, 20}, {30, 40}}; + + long[] expectTime = {1, 2}; + String[] expectDevice = {"d1", "d1"}; + int[] expectValue = {10, 20}; + + verifyOperatorOutput( + timeArray, deviceArray, valueArray, 2, false, expectTime, expectDevice, expectValue, null); + } + + @Test + public void testNoPartition() { + // No partition columns → global limit K=3 + long[][] timeArray = {{1, 2, 3, 4, 5}}; + String[][] deviceArray = {{"d1", "d1", "d2", "d2", "d2"}}; + int[][] valueArray = {{10, 20, 30, 40, 50}}; + + long[] expectTime = {1, 2, 3}; + String[] expectDevice = {"d1", "d1", "d2"}; + int[] expectValue = {10, 20, 30}; + + verifyOperatorOutputNoPartition( + timeArray, deviceArray, valueArray, 3, false, expectTime, expectDevice, expectValue); + } + + @Test + public void testPartialFilterInBlock() { + // Block has mixed: some rows pass, some are filtered + // d1: 4 rows, K=2 → only first 2 of d1 pass; d2: 2 rows, K=2 → both pass + long[][] timeArray = {{1, 2, 3, 4, 5, 6}}; + String[][] deviceArray = {{"d1", "d1", "d1", "d1", "d2", "d2"}}; + int[][] valueArray = {{10, 20, 30, 40, 50, 60}}; + + long[] expectTime = {1, 2, 5, 6}; + String[] expectDevice = {"d1", "d1", "d2", "d2"}; + int[] expectValue = {10, 20, 50, 60}; + + verifyOperatorOutput( + timeArray, deviceArray, valueArray, 2, false, expectTime, expectDevice, expectValue, null); + } + + @Test + public void testNullInputBlock() { + // ChildOperator can return null TsBlocks + long[][] timeArray = {{1, 2}, null, {3, 4}}; + String[][] deviceArray = {{"d1", "d1"}, null, {"d2", "d2"}}; + int[][] valueArray = {{10, 20}, null, {30, 40}}; + + long[] expectTime = {1, 2, 3, 4}; + String[] expectDevice = {"d1", "d1", "d2", "d2"}; + int[] expectValue = {10, 20, 30, 40}; + + verifyOperatorOutput( + timeArray, deviceArray, valueArray, 5, false, expectTime, expectDevice, expectValue, null); + } + + // ======================== Helper Methods ======================== + + private void verifyOperatorOutput( + long[][] timeArray, + String[][] deviceArray, + int[][] valueArray, + int k, + boolean produceRowNumber, + long[] expectTime, + String[] expectDevice, + int[] expectValue, + long[] expectRowNumber) { + int count = 0; + try (LimitKRankingOperator operator = + createOperator(timeArray, deviceArray, valueArray, k, produceRowNumber, true)) { + ListenableFuture listenableFuture = operator.isBlocked(); + listenableFuture.get(); + while (!operator.isFinished() && operator.hasNext()) { + TsBlock tsBlock = operator.next(); + if (tsBlock != null && !tsBlock.isEmpty()) { + for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, count++) { + assertEquals(expectTime[count], tsBlock.getColumn(0).getLong(i)); + assertEquals( + expectDevice[count], + tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET)); + assertEquals(expectValue[count], tsBlock.getColumn(2).getInt(i)); + if (produceRowNumber && expectRowNumber != null) { + assertEquals(expectRowNumber[count], tsBlock.getColumn(3).getLong(i)); + } + } + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + assertEquals(expectTime.length, count); + } + + private void verifyOperatorOutputNoPartition( + long[][] timeArray, + String[][] deviceArray, + int[][] valueArray, + int k, + boolean produceRowNumber, + long[] expectTime, + String[] expectDevice, + int[] expectValue) { + int count = 0; + try (LimitKRankingOperator operator = + createOperator(timeArray, deviceArray, valueArray, k, produceRowNumber, false)) { + ListenableFuture listenableFuture = operator.isBlocked(); + listenableFuture.get(); + while (!operator.isFinished() && operator.hasNext()) { + TsBlock tsBlock = operator.next(); + if (tsBlock != null && !tsBlock.isEmpty()) { + for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, count++) { + assertEquals(expectTime[count], tsBlock.getColumn(0).getLong(i)); + assertEquals( + expectDevice[count], + tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET)); + assertEquals(expectValue[count], tsBlock.getColumn(2).getInt(i)); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + assertEquals(expectTime.length, count); + } + + private LimitKRankingOperator createOperator( + long[][] timeArray, + String[][] deviceArray, + int[][] valueArray, + int k, + boolean produceRowNumber, + boolean hasPartition) { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); + PlanNodeId planNodeId = new PlanNodeId("1"); + driverContext.addOperatorContext(1, planNodeId, LimitKRankingOperator.class.getSimpleName()); + + Operator childOperator = new ChildOperator(timeArray, deviceArray, valueArray, driverContext); + + List outputChannels = Arrays.asList(0, 1, 2); + List partitionChannels; + List partitionTSDataTypes; + if (hasPartition) { + partitionChannels = Collections.singletonList(1); + partitionTSDataTypes = Collections.singletonList(TSDataType.TEXT); + } else { + partitionChannels = Collections.emptyList(); + partitionTSDataTypes = Collections.emptyList(); + } + + return new LimitKRankingOperator( + driverContext.getOperatorContexts().get(0), + childOperator, + INPUT_DATA_TYPES, + outputChannels, + partitionChannels, + partitionTSDataTypes, + k, + produceRowNumber, + 16); + } + + static class ChildOperator implements Operator { + private int index; + private final long[][] timeArray; + private final String[][] deviceArray; + private final int[][] valueArray; + private final DriverContext driverContext; + + ChildOperator( + long[][] timeArray, + String[][] deviceArray, + int[][] valueArray, + DriverContext driverContext) { + this.timeArray = timeArray; + this.deviceArray = deviceArray; + this.valueArray = valueArray; + this.driverContext = driverContext; + this.index = 0; + } + + @Override + public OperatorContext getOperatorContext() { + return driverContext.getOperatorContexts().get(0); + } + + @Override + public TsBlock next() { + if (timeArray[index] == null) { + index++; + return null; + } + TsBlockBuilder builder = + new TsBlockBuilder( + timeArray[index].length, + Arrays.asList(TSDataType.TIMESTAMP, TSDataType.TEXT, TSDataType.INT32)); + for (int i = 0, size = timeArray[index].length; i < size; i++) { + builder.getColumnBuilder(0).writeLong(timeArray[index][i]); + builder + .getColumnBuilder(1) + .writeBinary(new Binary(deviceArray[index][i], TSFileConfig.STRING_CHARSET)); + builder.getColumnBuilder(2).writeInt(valueArray[index][i]); + } + builder.declarePositions(timeArray[index].length); + index++; + return builder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, builder.getPositionCount())); + } + + @Override + public boolean hasNext() { + return index < timeArray.length; + } + + @Override + public boolean isFinished() { + return index >= timeArray.length; + } + + @Override + public void close() {} + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java index e31f2f7e58065..5f3dc0b3dcca4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java @@ -31,6 +31,8 @@ import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.group; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.limit; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.limitKRanking; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.mergeSort; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.rowNumber; @@ -291,10 +293,246 @@ public void testRowNumberPushDown() { * │ └──TableScan * └──ExchangeNode * └──RowNumberNode - * └──TableScan + * └──MergeSortNode + * ├──ExchangeNode + * │ └──TableScan + * └──ExchangeNode + * └──TableScan */ assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), exchange()))); assertPlan(planTester.getFragmentPlan(1), rowNumber(tableScan)); assertPlan(planTester.getFragmentPlan(2), rowNumber(tableScan)); + assertPlan(planTester.getFragmentPlan(3), rowNumber(mergeSort(exchange(), exchange()))); + assertPlan(planTester.getFragmentPlan(4), tableScan); + assertPlan(planTester.getFragmentPlan(5), tableScan); + } + + @Test + public void testLimitKRankingPushDownFilterOrderByTime() { + PlanTester planTester = new PlanTester(); + + // ORDER BY time triggers LimitKRankingNode instead of TopKRankingNode + String sql = + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER BY time) as rn FROM table1) WHERE rn <= 3"; + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + /* + * └──OutputNode + * └──LimitKRankingNode + * └──SortNode + * └──TableScanNode + */ + assertPlan(logicalQueryPlan, output(limitKRanking(sort(tableScan)))); + + /* OutputNode + * └──LimitKRankingNode + * └──CollectNode + * ├──ExchangeNode + * │ └──TableScan + * ├──ExchangeNode + * │ └──TableScan + * └──ExchangeNode + * └──TableScan + */ + assertPlan( + planTester.getFragmentPlan(0), + output(limitKRanking(collect(exchange(), exchange(), exchange())))); + assertPlan(planTester.getFragmentPlan(1), tableScan); + assertPlan(planTester.getFragmentPlan(2), tableScan); + assertPlan(planTester.getFragmentPlan(3), tableScan); + } + + @Test + public void testLimitKRankingPushDownFilterOrderByTimeWithAllTags() { + PlanTester planTester = new PlanTester(); + + // All tag columns in PARTITION BY + ORDER BY time → LimitKRankingNode with GroupNode push down + String sql = + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time) as rn FROM table1) WHERE rn <= 2"; + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + /* + * └──OutputNode + * └──LimitKRankingNode + * └──GroupNode + * └──TableScanNode + */ + assertPlan(logicalQueryPlan, output(limitKRanking(group(tableScan)))); + + /* OutputNode + * └──CollectNode + * ├──ExchangeNode + * │ └──LimitKRankingNode + * │ └──TableScan + * ├──ExchangeNode + * │ └──LimitKRankingNode + * │ └──TableScan + * └──ExchangeNode + * └──LimitKRankingNode + * └──SortNode + * └──TableScan + */ + assertPlan( + planTester.getFragmentPlan(0), output((collect(exchange(), exchange(), exchange())))); + assertPlan(planTester.getFragmentPlan(1), limitKRanking(tableScan)); + assertPlan(planTester.getFragmentPlan(2), limitKRanking(tableScan)); + assertPlan(planTester.getFragmentPlan(3), limitKRanking(mergeSort(exchange(), exchange()))); + assertPlan(planTester.getFragmentPlan(4), tableScan); + assertPlan(planTester.getFragmentPlan(5), tableScan); + } + + @Test + public void testLimitKRankingPushDownFilterOrderByTimeDesc() { + PlanTester planTester = new PlanTester(); + + // ORDER BY time DESC triggers LimitKRankingNode instead of TopKRankingNode + String sql = + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER BY time DESC) as rn FROM table1) WHERE rn <= 3"; + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + /* + * └──OutputNode + * └──LimitKRankingNode + * └──SortNode + * └──TableScanNode + */ + assertPlan(logicalQueryPlan, output(limitKRanking(sort(tableScan)))); + + /* OutputNode + * └──LimitKRankingNode + * └──CollectNode + * ├──ExchangeNode + * │ └──TableScan + * ├──ExchangeNode + * │ └──TableScan + * └──ExchangeNode + * └──TableScan + */ + assertPlan( + planTester.getFragmentPlan(0), + output(limitKRanking(collect(exchange(), exchange(), exchange())))); + assertPlan(planTester.getFragmentPlan(1), tableScan); + assertPlan(planTester.getFragmentPlan(2), tableScan); + assertPlan(planTester.getFragmentPlan(3), tableScan); + } + + @Test + public void testLimitKRankingPushDownFilterOrderByTimeDescWithAllTags() { + PlanTester planTester = new PlanTester(); + + // All tag columns in PARTITION BY + ORDER BY time DESC → LimitKRankingNode with GroupNode push + // down + String sql = + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time DESC) as rn FROM table1) WHERE rn <= 2"; + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + /* + * └──OutputNode + * └──LimitKRankingNode + * └──GroupNode + * └──TableScanNode + */ + assertPlan(logicalQueryPlan, output(limitKRanking(group(tableScan)))); + + /* OutputNode + * └──CollectNode + * ├──ExchangeNode + * │ └──LimitKRankingNode + * │ └──TableScan + * ├──ExchangeNode + * │ └──LimitKRankingNode + * │ └──TableScan + * └──ExchangeNode + * └──LimitKRankingNode + * └──SortNode + * └──TableScan + */ + assertPlan( + planTester.getFragmentPlan(0), output((collect(exchange(), exchange(), exchange())))); + assertPlan(planTester.getFragmentPlan(1), limitKRanking(tableScan)); + assertPlan(planTester.getFragmentPlan(2), limitKRanking(tableScan)); + assertPlan(planTester.getFragmentPlan(3), limitKRanking(mergeSort(exchange(), exchange()))); + assertPlan(planTester.getFragmentPlan(4), tableScan); + assertPlan(planTester.getFragmentPlan(5), tableScan); + } + + @Test + public void testLimitKRankingPushDownLimitOrderByTimeDesc() { + PlanTester planTester = new PlanTester(); + + // LIMIT pushdown + ORDER BY time DESC → LimitKRankingNode + String sql = + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER BY time DESC) as rn FROM table1) LIMIT 5"; + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + /* + * └──OutputNode + * └──LimitNode + * └──LimitKRankingNode + * └──SortNode + * └──TableScanNode + */ + assertPlan(logicalQueryPlan, output(limit(5, limitKRanking(sort(tableScan))))); + + /* OutputNode + * └──LimitNode + * └──LimitKRankingNode + * └──CollectNode + * ├──ExchangeNode + * │ └──TableScan + * ├──ExchangeNode + * │ └──TableScan + * └──ExchangeNode + * └──TableScan + */ + assertPlan( + planTester.getFragmentPlan(0), + output(limit(5, limitKRanking(collect(exchange(), exchange(), exchange()))))); + assertPlan(planTester.getFragmentPlan(1), tableScan); + assertPlan(planTester.getFragmentPlan(2), tableScan); + assertPlan(planTester.getFragmentPlan(3), tableScan); + } + + @Test + public void testLimitKRankingPushDownLimitOrderByTime() { + PlanTester planTester = new PlanTester(); + + // LIMIT pushdown + ORDER BY time → LimitKRankingNode + String sql = + "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER BY time) as rn FROM table1) LIMIT 5"; + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = tableScan("testdb.table1"); + + /* + * └──OutputNode + * └──LimitNode + * └──LimitKRankingNode + * └──SortNode + * └──TableScanNode + */ + assertPlan(logicalQueryPlan, output(limit(5, limitKRanking(sort(tableScan))))); + + /* OutputNode + * └──LimitNode + * └──LimitKRankingNode + * └──CollectNode + * ├──ExchangeNode + * │ └──TableScan + * ├──ExchangeNode + * │ └──TableScan + * └──ExchangeNode + * └──TableScan + */ + assertPlan( + planTester.getFragmentPlan(0), + output(limit(5, limitKRanking(collect(exchange(), exchange(), exchange()))))); + assertPlan(planTester.getFragmentPlan(1), tableScan); + assertPlan(planTester.getFragmentPlan(2), tableScan); + assertPlan(planTester.getFragmentPlan(3), tableScan); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java index 03f79fd2ec29a..f2b73d744599c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java @@ -40,6 +40,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; @@ -477,6 +478,10 @@ public static PlanMatchPattern topKRanking(PlanMatchPattern source) { return node(TopKRankingNode.class, source); } + public static PlanMatchPattern limitKRanking(PlanMatchPattern source) { + return node(LimitKRankingNode.class, source); + } + public static PlanMatchPattern rowNumber(PlanMatchPattern source) { return node(RowNumberNode.class, source); }