diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java
new file mode 100644
index 0000000000000..de90dd2009eba
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLimitKRankingIT.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for the LimitKRanking optimization. When using {@code ROW_NUMBER() OVER
+ * (PARTITION BY device ORDER BY time)}, the planner should produce a streaming LimitKRankingNode
+ * instead of the buffered TopKRankingNode, since data is already sorted by (device, time).
+ */
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBLimitKRankingIT {
+ private static final String DATABASE_NAME = "test";
+ private static final String[] sqls =
+ new String[] {
+ "CREATE DATABASE " + DATABASE_NAME,
+ "USE " + DATABASE_NAME,
+ "create table demo (device string tag, value double field)",
+ // d1: 4 rows
+ "insert into demo values (2021-01-01T09:05:00, 'd1', 3)",
+ "insert into demo values (2021-01-01T09:07:00, 'd1', 5)",
+ "insert into demo values (2021-01-01T09:09:00, 'd1', 3)",
+ "insert into demo values (2021-01-01T09:10:00, 'd1', 1)",
+ // d2: 2 rows
+ "insert into demo values (2021-01-01T09:08:00, 'd2', 2)",
+ "insert into demo values (2021-01-01T09:15:00, 'd2', 4)",
+ // d3: 5 rows
+ "insert into demo values (2021-01-01T09:01:00, 'd3', 10)",
+ "insert into demo values (2021-01-01T09:02:00, 'd3', 20)",
+ "insert into demo values (2021-01-01T09:03:00, 'd3', 30)",
+ "insert into demo values (2021-01-01T09:04:00, 'd3', 40)",
+ "insert into demo values (2021-01-01T09:06:00, 'd3', 50)",
+ "FLUSH",
+ "CLEAR ATTRIBUTE CACHE",
+ };
+
+ protected static void insertData() {
+ try (Connection connection = EnvFactory.getEnv().getTableConnection();
+ Statement statement = connection.createStatement()) {
+ for (String sql : sqls) {
+ statement.execute(sql);
+ }
+ } catch (Exception e) {
+ fail("insertData failed.");
+ }
+ }
+
+ @BeforeClass
+ public static void setUp() {
+ EnvFactory.getEnv().getConfig().getCommonConfig().setSortBufferSize(1024 * 1024);
+ EnvFactory.getEnv().initClusterEnvironment();
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testFilterPushDownOrderByTime() {
+ // ROW_NUMBER() OVER (PARTITION BY device ORDER BY time) WHERE rn <= 2
+ // Should use LimitKRankingOperator: take first 2 time-ordered rows per device
+ String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+ String[] retArray =
+ new String[] {
+ "2021-01-01T09:05:00.000Z,d1,3.0,1,",
+ "2021-01-01T09:07:00.000Z,d1,5.0,2,",
+ "2021-01-01T09:08:00.000Z,d2,2.0,1,",
+ "2021-01-01T09:15:00.000Z,d2,4.0,2,",
+ "2021-01-01T09:01:00.000Z,d3,10.0,1,",
+ "2021-01-01T09:02:00.000Z,d3,20.0,2,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM ("
+ + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) as rn FROM demo"
+ + ") WHERE rn <= 2 ORDER BY device, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testFilterPushDownOrderByTimeK1() {
+ // K=1: only the earliest row per device
+ String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+ String[] retArray =
+ new String[] {
+ "2021-01-01T09:05:00.000Z,d1,3.0,1,",
+ "2021-01-01T09:08:00.000Z,d2,2.0,1,",
+ "2021-01-01T09:01:00.000Z,d3,10.0,1,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM ("
+ + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) as rn FROM demo"
+ + ") WHERE rn <= 1 ORDER BY device, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testFilterPushDownOrderByTimeKLargerThanData() {
+ // K=100: larger than any partition, so all rows are returned
+ String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+ String[] retArray =
+ new String[] {
+ "2021-01-01T09:05:00.000Z,d1,3.0,1,",
+ "2021-01-01T09:07:00.000Z,d1,5.0,2,",
+ "2021-01-01T09:09:00.000Z,d1,3.0,3,",
+ "2021-01-01T09:10:00.000Z,d1,1.0,4,",
+ "2021-01-01T09:08:00.000Z,d2,2.0,1,",
+ "2021-01-01T09:15:00.000Z,d2,4.0,2,",
+ "2021-01-01T09:01:00.000Z,d3,10.0,1,",
+ "2021-01-01T09:02:00.000Z,d3,20.0,2,",
+ "2021-01-01T09:03:00.000Z,d3,30.0,3,",
+ "2021-01-01T09:04:00.000Z,d3,40.0,4,",
+ "2021-01-01T09:06:00.000Z,d3,50.0,5,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM ("
+ + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) as rn FROM demo"
+ + ") WHERE rn <= 100 ORDER BY device, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testLimitPushDownOrderByTime() {
+ // LIMIT pushdown: row_number() OVER (PARTITION BY device ORDER BY time) LIMIT 4
+ String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+ String[] retArray =
+ new String[] {
+ "2021-01-01T09:05:00.000Z,d1,3.0,1,",
+ "2021-01-01T09:07:00.000Z,d1,5.0,2,",
+ "2021-01-01T09:09:00.000Z,d1,3.0,3,",
+ "2021-01-01T09:10:00.000Z,d1,1.0,4,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM ("
+ + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) as rn FROM demo"
+ + ") ORDER BY device, time LIMIT 4",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testFilterPushDownOrderByTimeWithLessThan() {
+ // Use rn < 3 instead of rn <= 2 (should give same result as rn <= 2)
+ String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+ String[] retArray =
+ new String[] {
+ "2021-01-01T09:05:00.000Z,d1,3.0,1,",
+ "2021-01-01T09:07:00.000Z,d1,5.0,2,",
+ "2021-01-01T09:08:00.000Z,d2,2.0,1,",
+ "2021-01-01T09:15:00.000Z,d2,4.0,2,",
+ "2021-01-01T09:01:00.000Z,d3,10.0,1,",
+ "2021-01-01T09:02:00.000Z,d3,20.0,2,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM ("
+ + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) as rn FROM demo"
+ + ") WHERE rn < 3 ORDER BY device, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testFilterPushDownOrderByTimeSelectSubsetColumns() {
+ // Only select time and device (not the ranking column)
+ String[] expectedHeader = new String[] {"time", "device", "value"};
+ String[] retArray =
+ new String[] {
+ "2021-01-01T09:05:00.000Z,d1,3.0,",
+ "2021-01-01T09:07:00.000Z,d1,5.0,",
+ "2021-01-01T09:09:00.000Z,d1,3.0,",
+ "2021-01-01T09:08:00.000Z,d2,2.0,",
+ "2021-01-01T09:15:00.000Z,d2,4.0,",
+ "2021-01-01T09:01:00.000Z,d3,10.0,",
+ "2021-01-01T09:02:00.000Z,d3,20.0,",
+ "2021-01-01T09:03:00.000Z,d3,30.0,",
+ };
+ tableResultSetEqualTest(
+ "SELECT time, device, value FROM ("
+ + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) as rn FROM demo"
+ + ") WHERE rn <= 3 ORDER BY device, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testFilterPushDownOrderByTimeEqual() {
+ // rn = 2: only the second row per device
+ String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+ String[] retArray =
+ new String[] {
+ "2021-01-01T09:07:00.000Z,d1,5.0,2,",
+ "2021-01-01T09:15:00.000Z,d2,4.0,2,",
+ "2021-01-01T09:02:00.000Z,d3,20.0,2,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM ("
+ + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time) as rn FROM demo"
+ + ") WHERE rn = 2 ORDER BY device, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ // ==================== ORDER BY time DESC tests ====================
+
+ @Test
+ public void testFilterPushDownOrderByTimeDesc() {
+ // ROW_NUMBER() OVER (PARTITION BY device ORDER BY time DESC) WHERE rn <= 2
+ // Should use LimitKRankingOperator: take latest 2 rows per device
+ String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+ String[] retArray =
+ new String[] {
+ "2021-01-01T09:09:00.000Z,d1,3.0,2,",
+ "2021-01-01T09:10:00.000Z,d1,1.0,1,",
+ "2021-01-01T09:08:00.000Z,d2,2.0,2,",
+ "2021-01-01T09:15:00.000Z,d2,4.0,1,",
+ "2021-01-01T09:04:00.000Z,d3,40.0,2,",
+ "2021-01-01T09:06:00.000Z,d3,50.0,1,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM ("
+ + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo"
+ + ") WHERE rn <= 2 ORDER BY device, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testFilterPushDownOrderByTimeDescK1() {
+ // K=1: only the latest row per device
+ String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+ String[] retArray =
+ new String[] {
+ "2021-01-01T09:10:00.000Z,d1,1.0,1,",
+ "2021-01-01T09:15:00.000Z,d2,4.0,1,",
+ "2021-01-01T09:06:00.000Z,d3,50.0,1,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM ("
+ + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo"
+ + ") WHERE rn <= 1 ORDER BY device, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testFilterPushDownOrderByTimeDescKLargerThanData() {
+ // K=100: larger than any partition, so all rows are returned
+ String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+ String[] retArray =
+ new String[] {
+ "2021-01-01T09:05:00.000Z,d1,3.0,4,",
+ "2021-01-01T09:07:00.000Z,d1,5.0,3,",
+ "2021-01-01T09:09:00.000Z,d1,3.0,2,",
+ "2021-01-01T09:10:00.000Z,d1,1.0,1,",
+ "2021-01-01T09:08:00.000Z,d2,2.0,2,",
+ "2021-01-01T09:15:00.000Z,d2,4.0,1,",
+ "2021-01-01T09:01:00.000Z,d3,10.0,5,",
+ "2021-01-01T09:02:00.000Z,d3,20.0,4,",
+ "2021-01-01T09:03:00.000Z,d3,30.0,3,",
+ "2021-01-01T09:04:00.000Z,d3,40.0,2,",
+ "2021-01-01T09:06:00.000Z,d3,50.0,1,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM ("
+ + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo"
+ + ") WHERE rn <= 100 ORDER BY device, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testLimitPushDownOrderByTimeDesc() {
+ // LIMIT pushdown: row_number() OVER (PARTITION BY device ORDER BY time DESC) LIMIT 4
+ String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+ String[] retArray =
+ new String[] {
+ "2021-01-01T09:10:00.000Z,d1,1.0,1,",
+ "2021-01-01T09:09:00.000Z,d1,3.0,2,",
+ "2021-01-01T09:07:00.000Z,d1,5.0,3,",
+ "2021-01-01T09:05:00.000Z,d1,3.0,4,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM ("
+ + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo"
+ + ") ORDER BY device, time DESC LIMIT 4",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testFilterPushDownOrderByTimeDescWithLessThan() {
+ // Use rn < 3 instead of rn <= 2 (should give same result as rn <= 2)
+ String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+ String[] retArray =
+ new String[] {
+ "2021-01-01T09:09:00.000Z,d1,3.0,2,",
+ "2021-01-01T09:10:00.000Z,d1,1.0,1,",
+ "2021-01-01T09:08:00.000Z,d2,2.0,2,",
+ "2021-01-01T09:15:00.000Z,d2,4.0,1,",
+ "2021-01-01T09:04:00.000Z,d3,40.0,2,",
+ "2021-01-01T09:06:00.000Z,d3,50.0,1,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM ("
+ + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo"
+ + ") WHERE rn < 3 ORDER BY device, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testFilterPushDownOrderByTimeDescSelectSubsetColumns() {
+ // Only select time, device, value (not the ranking column)
+ String[] expectedHeader = new String[] {"time", "device", "value"};
+ String[] retArray =
+ new String[] {
+ "2021-01-01T09:07:00.000Z,d1,5.0,",
+ "2021-01-01T09:09:00.000Z,d1,3.0,",
+ "2021-01-01T09:10:00.000Z,d1,1.0,",
+ "2021-01-01T09:08:00.000Z,d2,2.0,",
+ "2021-01-01T09:15:00.000Z,d2,4.0,",
+ "2021-01-01T09:03:00.000Z,d3,30.0,",
+ "2021-01-01T09:04:00.000Z,d3,40.0,",
+ "2021-01-01T09:06:00.000Z,d3,50.0,",
+ };
+ tableResultSetEqualTest(
+ "SELECT time, device, value FROM ("
+ + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo"
+ + ") WHERE rn <= 3 ORDER BY device, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testFilterPushDownOrderByTimeDescEqual() {
+ // rn = 2: only the second-latest row per device
+ String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
+ String[] retArray =
+ new String[] {
+ "2021-01-01T09:09:00.000Z,d1,3.0,2,",
+ "2021-01-01T09:08:00.000Z,d2,2.0,2,",
+ "2021-01-01T09:04:00.000Z,d3,40.0,2,",
+ };
+ tableResultSetEqualTest(
+ "SELECT * FROM ("
+ + "SELECT *, row_number() OVER (PARTITION BY device ORDER BY time DESC) as rn FROM demo"
+ + ") WHERE rn = 2 ORDER BY device, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperator.java
new file mode 100644
index 0000000000000..11f75ab9ddee4
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperator.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.window;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory;
+import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash;
+import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
+import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash.createGroupByHash;
+
+/**
+ * Streaming operator optimized for {@code PARTITION BY device ORDER BY time} with a top-K filter.
+ *
+ *
Unlike {@link TopKRankingOperator} which buffers all input and uses heap-based selection, this
+ * operator exploits the fact that input data is already sorted by (device, time). It simply counts
+ * rows per partition and emits the first K rows for each, discarding the rest. This yields O(N)
+ * time with minimal memory overhead, compared to O(N log K) for the general TopK path.
+ */
+public class LimitKRankingOperator implements ProcessOperator {
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(LimitKRankingOperator.class);
+
+ private final OperatorContext operatorContext;
+ private final Operator inputOperator;
+ private final List inputDataTypes;
+
+ private final List outputChannels;
+ private final List partitionChannels;
+ private final int maxRowCountPerPartition;
+ private final boolean produceRowNumber;
+
+ private final Optional groupByHash;
+ private final Map partitionRowCounts;
+ private final TsBlockBuilder tsBlockBuilder;
+
+ public LimitKRankingOperator(
+ OperatorContext operatorContext,
+ Operator inputOperator,
+ List inputDataTypes,
+ List outputChannels,
+ List partitionChannels,
+ List partitionTSDataTypes,
+ int maxRowCountPerPartition,
+ boolean produceRowNumber,
+ int expectedPositions) {
+ this.operatorContext = operatorContext;
+ this.inputOperator = inputOperator;
+ this.inputDataTypes = inputDataTypes;
+ this.outputChannels = ImmutableList.copyOf(outputChannels);
+ this.partitionChannels = ImmutableList.copyOf(partitionChannels);
+ this.maxRowCountPerPartition = maxRowCountPerPartition;
+ this.produceRowNumber = produceRowNumber;
+
+ List outputDataTypes = new ArrayList<>();
+ for (int channel : outputChannels) {
+ outputDataTypes.add(inputDataTypes.get(channel));
+ }
+ if (produceRowNumber) {
+ outputDataTypes.add(TSDataType.INT64);
+ }
+ this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+
+ if (partitionChannels.isEmpty()) {
+ this.groupByHash = Optional.empty();
+ } else {
+ List partitionTypes = new ArrayList<>(partitionTSDataTypes.size());
+ for (TSDataType tsDataType : partitionTSDataTypes) {
+ partitionTypes.add(InternalTypeManager.fromTSDataType(tsDataType));
+ }
+ this.groupByHash =
+ Optional.of(
+ createGroupByHash(partitionTypes, false, expectedPositions, UpdateMemory.NOOP));
+ }
+
+ this.partitionRowCounts = new HashMap<>(expectedPositions);
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() throws Exception {
+ TsBlock input = inputOperator.nextWithTimer();
+ if (input == null) {
+ return null;
+ }
+
+ int positionCount = input.getPositionCount();
+ int[] partitionIds = getPartitionIds(input);
+
+ for (int position = 0; position < positionCount; position++) {
+ int partitionId = groupByHash.isPresent() ? partitionIds[position] : 0;
+ long rowCount = partitionRowCounts.getOrDefault(partitionId, 0L);
+
+ if (rowCount < maxRowCountPerPartition) {
+ emitRow(input, position, rowCount + 1);
+ }
+
+ partitionRowCounts.put(partitionId, rowCount + 1);
+ }
+
+ if (tsBlockBuilder.getPositionCount() == 0) {
+ tsBlockBuilder.reset();
+ return null;
+ }
+
+ TsBlock result =
+ tsBlockBuilder.build(
+ new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount()));
+ tsBlockBuilder.reset();
+ return result;
+ }
+
+ private void emitRow(TsBlock tsBlock, int position, long rowNumber) {
+ for (int i = 0; i < outputChannels.size(); i++) {
+ Column column = tsBlock.getColumn(outputChannels.get(i));
+ ColumnBuilder columnBuilder = tsBlockBuilder.getColumnBuilder(i);
+ if (column.isNull(position)) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.write(column, position);
+ }
+ }
+
+ if (produceRowNumber) {
+ tsBlockBuilder.getColumnBuilder(outputChannels.size()).writeLong(rowNumber);
+ }
+
+ tsBlockBuilder.declarePosition();
+ }
+
+ private int[] getPartitionIds(TsBlock tsBlock) {
+ if (groupByHash.isPresent()) {
+ Column[] partitionColumns = new Column[partitionChannels.size()];
+ for (int i = 0; i < partitionChannels.size(); i++) {
+ partitionColumns[i] = tsBlock.getColumn(partitionChannels.get(i));
+ }
+ return groupByHash.get().getGroupIds(partitionColumns);
+ }
+ return new int[0];
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return inputOperator.hasNextWithTimer();
+ }
+
+ @Override
+ public void close() throws Exception {
+ inputOperator.close();
+ }
+
+ @Override
+ public boolean isFinished() throws Exception {
+ return !hasNext();
+ }
+
+ @Override
+ public ListenableFuture> isBlocked() {
+ return inputOperator.isBlocked();
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemoryFromInput = inputOperator.calculateMaxPeekMemoryWithCounter();
+ long maxTsBlockSize = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+ return Math.max(maxPeekMemoryFromInput, maxTsBlockSize)
+ + inputOperator.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return inputOperator.calculateRetainedSizeAfterCallingNext();
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator)
+ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+ + tsBlockBuilder.getRetainedSizeInBytes();
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 1bc788251a764..cfcfb4a99f469 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -101,6 +101,7 @@
import org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.IrRowPatternToProgramRewriter;
import org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.Matcher;
import org.apache.iotdb.db.queryengine.execution.operator.process.rowpattern.matcher.Program;
+import org.apache.iotdb.db.queryengine.execution.operator.process.window.LimitKRankingOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.window.RowNumberOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.window.TableWindowOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.window.TopKRankingOperator;
@@ -198,6 +199,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
@@ -4372,4 +4374,46 @@ public Operator visitTopKRanking(TopKRankingNode node, LocalExecutionPlanContext
1000,
Optional.empty());
}
+
+ @Override
+ public Operator visitLimitKRanking(LimitKRankingNode node, LocalExecutionPlanContext context) {
+ Operator child = node.getChild().accept(this, context);
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ LimitKRankingOperator.class.getSimpleName());
+
+ List partitionBySymbols = node.getSpecification().getPartitionBy();
+ Map childLayout =
+ makeLayoutFromOutputSymbols(node.getChild().getOutputSymbols());
+ List partitionChannels = getChannelsForSymbols(partitionBySymbols, childLayout);
+ List inputDataTypes =
+ getOutputColumnTypes(node.getChild(), context.getTypeProvider());
+ List partitionTypes =
+ partitionChannels.stream().map(inputDataTypes::get).collect(toImmutableList());
+
+ ImmutableList.Builder outputChannels = ImmutableList.builder();
+ for (int i = 0; i < inputDataTypes.size(); i++) {
+ outputChannels.add(i);
+ }
+
+ ImmutableMap.Builder outputMappings = ImmutableMap.builder();
+ outputMappings.putAll(childLayout);
+ int channel = inputDataTypes.size();
+ outputMappings.put(node.getRankingSymbol(), channel);
+
+ return new LimitKRankingOperator(
+ operatorContext,
+ child,
+ inputDataTypes,
+ outputChannels.build(),
+ partitionChannels,
+ partitionTypes,
+ node.getMaxRowCountPerPartition(),
+ true,
+ 1000);
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index 180241b4192cd..ae24abe803d55 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -79,6 +79,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -1155,6 +1156,24 @@ public List visitTopKRanking(TopKRankingNode node, GraphContext context)
return render(node, boxValue, context);
}
+ @Override
+ public List visitLimitKRanking(LimitKRankingNode node, GraphContext context) {
+ List boxValue = new ArrayList<>();
+
+ boxValue.add(String.format("LimitKRanking-%s", node.getPlanNodeId().getId()));
+ boxValue.add(String.format("RankingSymbol: %s", node.getRankingSymbol()));
+ boxValue.add(String.format("MaxRowCount: %d", node.getMaxRowCountPerPartition()));
+ DataOrganizationSpecification specification = node.getSpecification();
+ if (!specification.getPartitionBy().isEmpty()) {
+ boxValue.add("Partition by: [" + Joiner.on(", ").join(specification.getPartitionBy()) + "]");
+ }
+ specification
+ .getOrderingScheme()
+ .ifPresent(orderingScheme -> boxValue.add("Order by: " + orderingScheme));
+
+ return render(node, boxValue, context);
+ }
+
private String printRegion(TRegionReplicaSet regionReplicaSet) {
return String.format(
"Partition: %s",
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 55aaefe8be618..c9c118e6a3cd8 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -125,6 +125,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -324,6 +325,8 @@ public enum PlanNodeType {
TABLE_ROW_NUMBER_NODE((short) 1038),
TABLE_VALUES_NODE((short) 1039),
+ TABLE_LIMITK_RANKING_NODE((short) 1040),
+
RELATIONAL_INSERT_TABLET((short) 2000),
RELATIONAL_INSERT_ROW((short) 2001),
RELATIONAL_INSERT_ROWS((short) 2002),
@@ -727,6 +730,8 @@ public static PlanNode deserialize(ByteBuffer buffer, short nodeType) {
return RowNumberNode.deserialize(buffer);
case 1039:
return ValuesNode.deserialize(buffer);
+ case 1040:
+ return LimitKRankingNode.deserialize(buffer);
case 2000:
return RelationalInsertTabletNode.deserialize(buffer);
case 2001:
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 44f1cd8bc1f67..d1a9958dea52a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -795,6 +795,12 @@ public R visitTopKRanking(
return visitSingleChildProcess(node, context);
}
+ public R visitLimitKRanking(
+ org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode node,
+ C context) {
+ return visitSingleChildProcess(node, context);
+ }
+
public R visitRowNumber(RowNumberNode node, C context) {
return visitSingleChildProcess(node, context);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 7072b5f519f73..9defd43755bfd 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -69,6 +69,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
@@ -1914,6 +1915,36 @@ public List visitTopKRanking(TopKRankingNode node, PlanContext context
}
}
+ @Override
+ public List visitLimitKRanking(LimitKRankingNode node, PlanContext context) {
+ Optional orderingScheme = node.getSpecification().getOrderingScheme();
+ if (orderingScheme.isPresent()) {
+ context.setExpectedOrderingScheme(orderingScheme.get());
+ nodeOrderingMap.put(node.getPlanNodeId(), orderingScheme.get());
+ }
+
+ checkArgument(
+ node.getChildren().size() == 1, "Size of LimitKRankingNode can only be 1 in logical plan.");
+ boolean canSplitPushDown = node.getChild() instanceof GroupNode;
+ if (!canSplitPushDown) {
+ node.setChild(((SortNode) node.getChild()).getChild());
+ }
+ List childrenNodes = node.getChildren().get(0).accept(this, context);
+
+ if (childrenNodes.size() == 1) {
+ node.setChild(childrenNodes.get(0));
+ return Collections.singletonList(node);
+ } else if (!canSplitPushDown) {
+ CollectNode collectNode =
+ new CollectNode(queryId.genPlanNodeId(), node.getChildren().get(0).getOutputSymbols());
+ childrenNodes.forEach(collectNode::addChild);
+ node.setChild(collectNode);
+ return Collections.singletonList(node);
+ } else {
+ return splitForEachChild(node, childrenNodes);
+ }
+ }
+
@Override
public List visitUnion(UnionNode node, PlanContext context) {
context.clearExpectedOrderingScheme();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java
index d7ecab9cabfe4..c1c5e7734ce35 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownFilterIntoWindow.java
@@ -19,10 +19,12 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValuesNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
@@ -42,6 +44,7 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.canUseLimitKRanking;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.toTopNRankingType;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.filter;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.source;
@@ -90,15 +93,26 @@ public Result apply(FilterNode node, Captures captures, Context context) {
new ValuesNode(node.getPlanNodeId(), node.getOutputSymbols(), ImmutableList.of()));
}
- TopKRankingNode newSource =
- new TopKRankingNode(
- windowNode.getPlanNodeId(),
- windowNode.getChild(),
- windowNode.getSpecification(),
- rankingType.get(),
- rankingSymbol,
- upperBound.getAsInt(),
- false);
+ PlanNode newSource;
+ if (canUseLimitKRanking(windowNode, rankingType.get(), context.getSymbolAllocator())) {
+ newSource =
+ new LimitKRankingNode(
+ windowNode.getPlanNodeId(),
+ windowNode.getChild(),
+ windowNode.getSpecification(),
+ rankingSymbol,
+ upperBound.getAsInt());
+ } else {
+ newSource =
+ new TopKRankingNode(
+ windowNode.getPlanNodeId(),
+ windowNode.getChild(),
+ windowNode.getSpecification(),
+ rankingType.get(),
+ rankingSymbol,
+ upperBound.getAsInt(),
+ false);
+ }
if (needToKeepFilter(node.getPredicate(), rankingSymbol, upperBound.getAsInt())) {
return Result.ofPlanNode(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java
index e3ce8619c5beb..b8f2f415d0a55 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PushDownLimitIntoWindow.java
@@ -20,7 +20,9 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
@@ -34,6 +36,7 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static java.lang.Math.toIntExact;
+import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.canUseLimitKRanking;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.toTopNRankingType;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChildReplacer.replaceChildren;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.limit;
@@ -83,18 +86,31 @@ public Result apply(LimitNode node, Captures captures, Context context) {
Optional rankingType = toTopNRankingType(source);
int limit = toIntExact(node.getCount());
- TopKRankingNode topNRowNumberNode =
- new TopKRankingNode(
- source.getPlanNodeId(),
- source.getChild(),
- source.getSpecification(),
- rankingType.get(),
- getOnlyElement(source.getWindowFunctions().keySet()),
- limit,
- false);
+
+ PlanNode rankingNode;
+ if (canUseLimitKRanking(source, rankingType.get(), context.getSymbolAllocator())) {
+ rankingNode =
+ new LimitKRankingNode(
+ source.getPlanNodeId(),
+ source.getChild(),
+ source.getSpecification(),
+ getOnlyElement(source.getWindowFunctions().keySet()),
+ limit);
+ } else {
+ rankingNode =
+ new TopKRankingNode(
+ source.getPlanNodeId(),
+ source.getChild(),
+ source.getSpecification(),
+ rankingType.get(),
+ getOnlyElement(source.getWindowFunctions().keySet()),
+ limit,
+ false);
+ }
+
if (rankingType.get() == ROW_NUMBER && source.getSpecification().getPartitionBy().isEmpty()) {
- return Result.ofPlanNode(topNRowNumberNode);
+ return Result.ofPlanNode(rankingNode);
}
- return Result.ofPlanNode(replaceChildren(node, ImmutableList.of(topNRowNumberNode)));
+ return Result.ofPlanNode(replaceChildren(node, ImmutableList.of(rankingNode)));
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
index 5845ffd422105..a96681821d164 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/Util.java
@@ -22,7 +22,9 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode;
@@ -32,6 +34,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
+import org.apache.tsfile.read.common.type.TimestampType;
+import org.apache.tsfile.read.common.type.Type;
import java.util.Collection;
import java.util.List;
@@ -142,4 +146,34 @@ public static Optional toTopNRankingType(WindowNode
}
return Optional.empty();
}
+
+ /**
+ * Returns true when the window is ROW_NUMBER with ORDER BY on a single TIMESTAMP column (ASC or
+ * DESC). In IoTDB, data is naturally sorted by (device, time) and table scans support both
+ * forward and reverse iteration, so we can use the streaming LimitKRankingNode instead of the
+ * buffered TopKRankingNode for both ASC (earliest K) and DESC (latest K).
+ */
+ public static boolean canUseLimitKRanking(
+ WindowNode windowNode,
+ TopKRankingNode.RankingType rankingType,
+ SymbolAllocator symbolAllocator) {
+ if (rankingType != ROW_NUMBER && rankingType != RANK) {
+ return false;
+ }
+
+ Optional orderingScheme = windowNode.getSpecification().getOrderingScheme();
+ if (!orderingScheme.isPresent()) {
+ return false;
+ }
+
+ List orderBy = orderingScheme.get().getOrderBy();
+ if (orderBy.size() != 1) {
+ return false;
+ }
+
+ Symbol orderSymbol = orderBy.get(0);
+
+ Type type = symbolAllocator.getTypes().getTableModelType(orderSymbol);
+ return type instanceof TimestampType;
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitKRankingNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitKRankingNode.java
new file mode 100644
index 0000000000000..a78c9f9bbdafc
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitKRankingNode.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Plan node for the streaming per-partition limit optimization. Used when the input data is already
+ * sorted by (partition_keys, time), so we can take the first K rows per partition without sorting.
+ */
+public class LimitKRankingNode extends SingleChildProcessNode {
+
+ private final DataOrganizationSpecification specification;
+ private final Symbol rankingSymbol;
+ private final int maxRowCountPerPartition;
+
+ public LimitKRankingNode(
+ PlanNodeId id,
+ DataOrganizationSpecification specification,
+ Symbol rankingSymbol,
+ int maxRowCountPerPartition) {
+ super(id);
+ this.specification = specification;
+ this.rankingSymbol = rankingSymbol;
+ this.maxRowCountPerPartition = maxRowCountPerPartition;
+ }
+
+ public LimitKRankingNode(
+ PlanNodeId id,
+ PlanNode child,
+ DataOrganizationSpecification specification,
+ Symbol rankingSymbol,
+ int maxRowCountPerPartition) {
+ super(id, child);
+ this.specification = specification;
+ this.rankingSymbol = rankingSymbol;
+ this.maxRowCountPerPartition = maxRowCountPerPartition;
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new LimitKRankingNode(
+ getPlanNodeId(), specification, rankingSymbol, maxRowCountPerPartition);
+ }
+
+ @Override
+ public R accept(PlanVisitor visitor, C context) {
+ return visitor.visitLimitKRanking(this, context);
+ }
+
+ public DataOrganizationSpecification getSpecification() {
+ return specification;
+ }
+
+ public Symbol getRankingSymbol() {
+ return rankingSymbol;
+ }
+
+ public int getMaxRowCountPerPartition() {
+ return maxRowCountPerPartition;
+ }
+
+ @Override
+ public List getOutputColumnNames() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List getOutputSymbols() {
+ return ImmutableList.builder()
+ .addAll(getChild().getOutputSymbols())
+ .add(rankingSymbol)
+ .build();
+ }
+
+ @Override
+ public PlanNode replaceChildren(List newChildren) {
+ return new LimitKRankingNode(
+ id,
+ Iterables.getOnlyElement(newChildren),
+ specification,
+ rankingSymbol,
+ maxRowCountPerPartition);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.TABLE_LIMITK_RANKING_NODE.serialize(byteBuffer);
+ specification.serialize(byteBuffer);
+ Symbol.serialize(rankingSymbol, byteBuffer);
+ ReadWriteIOUtils.write(maxRowCountPerPartition, byteBuffer);
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.TABLE_LIMITK_RANKING_NODE.serialize(stream);
+ specification.serialize(stream);
+ Symbol.serialize(rankingSymbol, stream);
+ ReadWriteIOUtils.write(maxRowCountPerPartition, stream);
+ }
+
+ public static LimitKRankingNode deserialize(ByteBuffer byteBuffer) {
+ DataOrganizationSpecification specification =
+ DataOrganizationSpecification.deserialize(byteBuffer);
+ Symbol rankingSymbol = Symbol.deserialize(byteBuffer);
+ int maxRowCountPerPartition = ReadWriteIOUtils.readInt(byteBuffer);
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new LimitKRankingNode(planNodeId, specification, rankingSymbol, maxRowCountPerPartition);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ LimitKRankingNode that = (LimitKRankingNode) o;
+ return Objects.equal(specification, that.specification)
+ && Objects.equal(rankingSymbol, that.rankingSymbol)
+ && Objects.equal(maxRowCountPerPartition, that.maxRowCountPerPartition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(
+ super.hashCode(), specification, rankingSymbol, maxRowCountPerPartition);
+ }
+
+ @Override
+ public String toString() {
+ return "LimitKRankingNode-" + this.getPlanNodeId();
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
index bb276f07150b9..bdc986e63da41 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FillNode;
@@ -31,6 +32,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import java.util.Collections;
@@ -72,10 +74,13 @@ public PlanNode visitSort(SortNode node, Context context) {
OrderingScheme orderingScheme = node.getOrderingScheme();
if (context.canEliminateSort()
&& newContext.getTotalDeviceEntrySize() == 1
- && orderingScheme.getOrderBy().get(0).getName().equals(context.getTimeColumnName())) {
+ && orderingScheme.getOrderBy().get(0).getName().equals(context.getTimeColumnName())
+ && timeSortDirectionMatchesScanOrder(orderingScheme, newContext)) {
return child;
}
- return context.canEliminateSort() && node.isOrderByAllIdsAndTime()
+ return context.canEliminateSort()
+ && node.isOrderByAllIdsAndTime()
+ && timeSortDirectionMatchesScanOrder(orderingScheme, newContext)
? child
: node.replaceChildren(Collections.singletonList(child));
}
@@ -86,17 +91,39 @@ public PlanNode visitStreamSort(StreamSortNode node, Context context) {
PlanNode child = node.getChild().accept(this, newContext);
context.setCannotEliminateSort(newContext.cannotEliminateSort);
return context.canEliminateSort()
- && (node.isOrderByAllIdsAndTime()
+ && ((node.isOrderByAllIdsAndTime()
+ && timeSortDirectionMatchesScanOrder(node.getOrderingScheme(), newContext))
|| node.getStreamCompareKeyEndIndex()
== node.getOrderingScheme().getOrderBy().size() - 1)
? child
: node.replaceChildren(Collections.singletonList(child));
}
+ /**
+ * Checks whether the sort direction on the time column matches the underlying scan order. When
+ * the scan is DESC (e.g. due to LimitKRanking with ORDER BY time DESC), a sort expecting ASC
+ * must not be eliminated.
+ */
+ private boolean timeSortDirectionMatchesScanOrder(
+ OrderingScheme orderingScheme, Context childContext) {
+ if (childContext.getTimeColumnName() == null) {
+ return true;
+ }
+ for (Symbol symbol : orderingScheme.getOrderBy()) {
+ if (symbol.getName().equals(childContext.getTimeColumnName())) {
+ SortOrder sortOrder = orderingScheme.getOrdering(symbol);
+ boolean scanAscending = childContext.getScanOrder() == Ordering.ASC;
+ return sortOrder.isAscending() == scanAscending;
+ }
+ }
+ return true;
+ }
+
@Override
public PlanNode visitDeviceTableScan(DeviceTableScanNode node, Context context) {
context.addDeviceEntrySize(node.getDeviceEntries().size());
context.setTimeColumnName(node.getTimeColumn().map(Symbol::getName).orElse(null));
+ context.setScanOrder(node.getScanOrder());
return node;
}
@@ -152,6 +179,8 @@ private static class Context {
private String timeColumnName = null;
+ private Ordering scanOrder = Ordering.ASC;
+
Context() {}
public void addDeviceEntrySize(int deviceEntrySize) {
@@ -177,5 +206,13 @@ public String getTimeColumnName() {
public void setTimeColumnName(String timeColumnName) {
this.timeColumnName = timeColumnName;
}
+
+ public Ordering getScanOrder() {
+ return scanOrder;
+ }
+
+ public void setScanOrder(Ordering scanOrder) {
+ this.scanOrder = scanOrder;
+ }
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java
index 002e59f124c9f..e03140fb42aa6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SymbolMapper.java
@@ -30,6 +30,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ExpressionTreeRewriter;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Measure;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -300,6 +301,15 @@ public TopKRankingNode map(TopKRankingNode node, PlanNode source) {
node.isPartial());
}
+ public LimitKRankingNode map(LimitKRankingNode node, PlanNode source) {
+ return new LimitKRankingNode(
+ node.getPlanNodeId(),
+ source,
+ mapAndDistinct(node.getSpecification()),
+ map(node.getRankingSymbol()),
+ node.getMaxRowCountPerPartition());
+ }
+
public RowNumberNode map(RowNumberNode node, PlanNode source) {
return new RowNumberNode(
node.getPlanNodeId(),
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
index b7f2e9eaaa851..540e20d043651 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
@@ -46,6 +46,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
@@ -650,6 +651,17 @@ public PlanAndMappings visitTopKRanking(TopKRankingNode node, UnaliasContext con
return new PlanAndMappings(rewrittenTopKRanking, mapping);
}
+ @Override
+ public PlanAndMappings visitLimitKRanking(LimitKRankingNode node, UnaliasContext context) {
+ PlanAndMappings rewrittenSource = node.getChild().accept(this, context);
+ Map mapping = new HashMap<>(rewrittenSource.getMappings());
+ SymbolMapper mapper = symbolMapper(mapping);
+
+ LimitKRankingNode rewrittenNode = mapper.map(node, rewrittenSource.getRoot());
+
+ return new PlanAndMappings(rewrittenNode, mapping);
+ }
+
@Override
public PlanAndMappings visitOutput(OutputNode node, UnaliasContext context) {
PlanAndMappings rewrittenSource = node.getChild().accept(this, context);
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperatorTest.java
new file mode 100644
index 0000000000000..9acae751378b3
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/LimitKRankingOperatorTest.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.window;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.Binary;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class LimitKRankingOperatorTest {
+ private static final ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "limitKRanking-test-instance-notification");
+
+ private static final List INPUT_DATA_TYPES =
+ Arrays.asList(TSDataType.TIMESTAMP, TSDataType.TEXT, TSDataType.INT32);
+
+ @Test
+ public void testSingleBlockMultiPartition() {
+ // Input: d1 has 3 rows, d2 has 3 rows; K=2 → keep first 2 per partition
+ long[][] timeArray = {{1, 2, 3, 4, 5, 6}};
+ String[][] deviceArray = {{"d1", "d1", "d1", "d2", "d2", "d2"}};
+ int[][] valueArray = {{10, 20, 30, 40, 50, 60}};
+
+ long[] expectTime = {1, 2, 4, 5};
+ String[] expectDevice = {"d1", "d1", "d2", "d2"};
+ int[] expectValue = {10, 20, 40, 50};
+
+ verifyOperatorOutput(
+ timeArray, deviceArray, valueArray, 2, false, expectTime, expectDevice, expectValue, null);
+ }
+
+ @Test
+ public void testPartitionCrossMultiBlocks() {
+ // d1 spans block0 and block1; d2 spans block1 and block2; K=3
+ long[][] timeArray = {{1, 2}, {3, 4, 5}, {6, 7, 8}};
+ String[][] deviceArray = {{"d1", "d1"}, {"d1", "d1", "d2"}, {"d2", "d2", "d2"}};
+ int[][] valueArray = {{10, 20}, {30, 40, 50}, {60, 70, 80}};
+
+ long[] expectTime = {1, 2, 3, 5, 6, 7};
+ String[] expectDevice = {"d1", "d1", "d1", "d2", "d2", "d2"};
+ int[] expectValue = {10, 20, 30, 50, 60, 70};
+
+ verifyOperatorOutput(
+ timeArray, deviceArray, valueArray, 3, false, expectTime, expectDevice, expectValue, null);
+ }
+
+ @Test
+ public void testWithRowNumber() {
+ long[][] timeArray = {{1, 2, 3, 4}};
+ String[][] deviceArray = {{"d1", "d1", "d2", "d2"}};
+ int[][] valueArray = {{10, 20, 30, 40}};
+
+ long[] expectTime = {1, 2, 3, 4};
+ String[] expectDevice = {"d1", "d1", "d2", "d2"};
+ int[] expectValue = {10, 20, 30, 40};
+ long[] expectRowNumber = {1, 2, 1, 2};
+
+ verifyOperatorOutput(
+ timeArray,
+ deviceArray,
+ valueArray,
+ 2,
+ true,
+ expectTime,
+ expectDevice,
+ expectValue,
+ expectRowNumber);
+ }
+
+ @Test
+ public void testKLargerThanData() {
+ // K=10, but only 2 rows per partition → all rows emitted
+ long[][] timeArray = {{1, 2, 3, 4}};
+ String[][] deviceArray = {{"d1", "d1", "d2", "d2"}};
+ int[][] valueArray = {{10, 20, 30, 40}};
+
+ long[] expectTime = {1, 2, 3, 4};
+ String[] expectDevice = {"d1", "d1", "d2", "d2"};
+ int[] expectValue = {10, 20, 30, 40};
+
+ verifyOperatorOutput(
+ timeArray, deviceArray, valueArray, 10, false, expectTime, expectDevice, expectValue, null);
+ }
+
+ @Test
+ public void testEntireBlockFiltered() {
+ // K=2, d1 gets 2 rows in block0 → block1 (all d1) should be fully filtered
+ long[][] timeArray = {{1, 2}, {3, 4}};
+ String[][] deviceArray = {{"d1", "d1"}, {"d1", "d1"}};
+ int[][] valueArray = {{10, 20}, {30, 40}};
+
+ long[] expectTime = {1, 2};
+ String[] expectDevice = {"d1", "d1"};
+ int[] expectValue = {10, 20};
+
+ verifyOperatorOutput(
+ timeArray, deviceArray, valueArray, 2, false, expectTime, expectDevice, expectValue, null);
+ }
+
+ @Test
+ public void testNoPartition() {
+ // No partition columns → global limit K=3
+ long[][] timeArray = {{1, 2, 3, 4, 5}};
+ String[][] deviceArray = {{"d1", "d1", "d2", "d2", "d2"}};
+ int[][] valueArray = {{10, 20, 30, 40, 50}};
+
+ long[] expectTime = {1, 2, 3};
+ String[] expectDevice = {"d1", "d1", "d2"};
+ int[] expectValue = {10, 20, 30};
+
+ verifyOperatorOutputNoPartition(
+ timeArray, deviceArray, valueArray, 3, false, expectTime, expectDevice, expectValue);
+ }
+
+ @Test
+ public void testPartialFilterInBlock() {
+ // Block has mixed: some rows pass, some are filtered
+ // d1: 4 rows, K=2 → only first 2 of d1 pass; d2: 2 rows, K=2 → both pass
+ long[][] timeArray = {{1, 2, 3, 4, 5, 6}};
+ String[][] deviceArray = {{"d1", "d1", "d1", "d1", "d2", "d2"}};
+ int[][] valueArray = {{10, 20, 30, 40, 50, 60}};
+
+ long[] expectTime = {1, 2, 5, 6};
+ String[] expectDevice = {"d1", "d1", "d2", "d2"};
+ int[] expectValue = {10, 20, 50, 60};
+
+ verifyOperatorOutput(
+ timeArray, deviceArray, valueArray, 2, false, expectTime, expectDevice, expectValue, null);
+ }
+
+ @Test
+ public void testNullInputBlock() {
+ // ChildOperator can return null TsBlocks
+ long[][] timeArray = {{1, 2}, null, {3, 4}};
+ String[][] deviceArray = {{"d1", "d1"}, null, {"d2", "d2"}};
+ int[][] valueArray = {{10, 20}, null, {30, 40}};
+
+ long[] expectTime = {1, 2, 3, 4};
+ String[] expectDevice = {"d1", "d1", "d2", "d2"};
+ int[] expectValue = {10, 20, 30, 40};
+
+ verifyOperatorOutput(
+ timeArray, deviceArray, valueArray, 5, false, expectTime, expectDevice, expectValue, null);
+ }
+
+ // ======================== Helper Methods ========================
+
+ private void verifyOperatorOutput(
+ long[][] timeArray,
+ String[][] deviceArray,
+ int[][] valueArray,
+ int k,
+ boolean produceRowNumber,
+ long[] expectTime,
+ String[] expectDevice,
+ int[] expectValue,
+ long[] expectRowNumber) {
+ int count = 0;
+ try (LimitKRankingOperator operator =
+ createOperator(timeArray, deviceArray, valueArray, k, produceRowNumber, true)) {
+ ListenableFuture> listenableFuture = operator.isBlocked();
+ listenableFuture.get();
+ while (!operator.isFinished() && operator.hasNext()) {
+ TsBlock tsBlock = operator.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, count++) {
+ assertEquals(expectTime[count], tsBlock.getColumn(0).getLong(i));
+ assertEquals(
+ expectDevice[count],
+ tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ assertEquals(expectValue[count], tsBlock.getColumn(2).getInt(i));
+ if (produceRowNumber && expectRowNumber != null) {
+ assertEquals(expectRowNumber[count], tsBlock.getColumn(3).getLong(i));
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ assertEquals(expectTime.length, count);
+ }
+
+ private void verifyOperatorOutputNoPartition(
+ long[][] timeArray,
+ String[][] deviceArray,
+ int[][] valueArray,
+ int k,
+ boolean produceRowNumber,
+ long[] expectTime,
+ String[] expectDevice,
+ int[] expectValue) {
+ int count = 0;
+ try (LimitKRankingOperator operator =
+ createOperator(timeArray, deviceArray, valueArray, k, produceRowNumber, false)) {
+ ListenableFuture> listenableFuture = operator.isBlocked();
+ listenableFuture.get();
+ while (!operator.isFinished() && operator.hasNext()) {
+ TsBlock tsBlock = operator.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, count++) {
+ assertEquals(expectTime[count], tsBlock.getColumn(0).getLong(i));
+ assertEquals(
+ expectDevice[count],
+ tsBlock.getColumn(1).getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET));
+ assertEquals(expectValue[count], tsBlock.getColumn(2).getInt(i));
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ assertEquals(expectTime.length, count);
+ }
+
+ private LimitKRankingOperator createOperator(
+ long[][] timeArray,
+ String[][] deviceArray,
+ int[][] valueArray,
+ int k,
+ boolean produceRowNumber,
+ boolean hasPartition) {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
+ PlanNodeId planNodeId = new PlanNodeId("1");
+ driverContext.addOperatorContext(1, planNodeId, LimitKRankingOperator.class.getSimpleName());
+
+ Operator childOperator = new ChildOperator(timeArray, deviceArray, valueArray, driverContext);
+
+ List outputChannels = Arrays.asList(0, 1, 2);
+ List partitionChannels;
+ List partitionTSDataTypes;
+ if (hasPartition) {
+ partitionChannels = Collections.singletonList(1);
+ partitionTSDataTypes = Collections.singletonList(TSDataType.TEXT);
+ } else {
+ partitionChannels = Collections.emptyList();
+ partitionTSDataTypes = Collections.emptyList();
+ }
+
+ return new LimitKRankingOperator(
+ driverContext.getOperatorContexts().get(0),
+ childOperator,
+ INPUT_DATA_TYPES,
+ outputChannels,
+ partitionChannels,
+ partitionTSDataTypes,
+ k,
+ produceRowNumber,
+ 16);
+ }
+
+ static class ChildOperator implements Operator {
+ private int index;
+ private final long[][] timeArray;
+ private final String[][] deviceArray;
+ private final int[][] valueArray;
+ private final DriverContext driverContext;
+
+ ChildOperator(
+ long[][] timeArray,
+ String[][] deviceArray,
+ int[][] valueArray,
+ DriverContext driverContext) {
+ this.timeArray = timeArray;
+ this.deviceArray = deviceArray;
+ this.valueArray = valueArray;
+ this.driverContext = driverContext;
+ this.index = 0;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return driverContext.getOperatorContexts().get(0);
+ }
+
+ @Override
+ public TsBlock next() {
+ if (timeArray[index] == null) {
+ index++;
+ return null;
+ }
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ timeArray[index].length,
+ Arrays.asList(TSDataType.TIMESTAMP, TSDataType.TEXT, TSDataType.INT32));
+ for (int i = 0, size = timeArray[index].length; i < size; i++) {
+ builder.getColumnBuilder(0).writeLong(timeArray[index][i]);
+ builder
+ .getColumnBuilder(1)
+ .writeBinary(new Binary(deviceArray[index][i], TSFileConfig.STRING_CHARSET));
+ builder.getColumnBuilder(2).writeInt(valueArray[index][i]);
+ }
+ builder.declarePositions(timeArray[index].length);
+ index++;
+ return builder.build(
+ new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, builder.getPositionCount()));
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < timeArray.length;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return index >= timeArray.length;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return 0;
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
index e31f2f7e58065..5f3dc0b3dcca4 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/WindowFunctionOptimizationTest.java
@@ -31,6 +31,8 @@
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.group;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.limit;
+import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.limitKRanking;
+import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.mergeSort;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.rowNumber;
@@ -291,10 +293,246 @@ public void testRowNumberPushDown() {
* │ └──TableScan
* └──ExchangeNode
* └──RowNumberNode
- * └──TableScan
+ * └──MergeSortNode
+ * ├──ExchangeNode
+ * │ └──TableScan
+ * └──ExchangeNode
+ * └──TableScan
*/
assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), exchange(), exchange())));
assertPlan(planTester.getFragmentPlan(1), rowNumber(tableScan));
assertPlan(planTester.getFragmentPlan(2), rowNumber(tableScan));
+ assertPlan(planTester.getFragmentPlan(3), rowNumber(mergeSort(exchange(), exchange())));
+ assertPlan(planTester.getFragmentPlan(4), tableScan);
+ assertPlan(planTester.getFragmentPlan(5), tableScan);
+ }
+
+ @Test
+ public void testLimitKRankingPushDownFilterOrderByTime() {
+ PlanTester planTester = new PlanTester();
+
+ // ORDER BY time triggers LimitKRankingNode instead of TopKRankingNode
+ String sql =
+ "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER BY time) as rn FROM table1) WHERE rn <= 3";
+ LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+ PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+ /*
+ * └──OutputNode
+ * └──LimitKRankingNode
+ * └──SortNode
+ * └──TableScanNode
+ */
+ assertPlan(logicalQueryPlan, output(limitKRanking(sort(tableScan))));
+
+ /* OutputNode
+ * └──LimitKRankingNode
+ * └──CollectNode
+ * ├──ExchangeNode
+ * │ └──TableScan
+ * ├──ExchangeNode
+ * │ └──TableScan
+ * └──ExchangeNode
+ * └──TableScan
+ */
+ assertPlan(
+ planTester.getFragmentPlan(0),
+ output(limitKRanking(collect(exchange(), exchange(), exchange()))));
+ assertPlan(planTester.getFragmentPlan(1), tableScan);
+ assertPlan(planTester.getFragmentPlan(2), tableScan);
+ assertPlan(planTester.getFragmentPlan(3), tableScan);
+ }
+
+ @Test
+ public void testLimitKRankingPushDownFilterOrderByTimeWithAllTags() {
+ PlanTester planTester = new PlanTester();
+
+ // All tag columns in PARTITION BY + ORDER BY time → LimitKRankingNode with GroupNode push down
+ String sql =
+ "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time) as rn FROM table1) WHERE rn <= 2";
+ LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+ PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+ /*
+ * └──OutputNode
+ * └──LimitKRankingNode
+ * └──GroupNode
+ * └──TableScanNode
+ */
+ assertPlan(logicalQueryPlan, output(limitKRanking(group(tableScan))));
+
+ /* OutputNode
+ * └──CollectNode
+ * ├──ExchangeNode
+ * │ └──LimitKRankingNode
+ * │ └──TableScan
+ * ├──ExchangeNode
+ * │ └──LimitKRankingNode
+ * │ └──TableScan
+ * └──ExchangeNode
+ * └──LimitKRankingNode
+ * └──SortNode
+ * └──TableScan
+ */
+ assertPlan(
+ planTester.getFragmentPlan(0), output((collect(exchange(), exchange(), exchange()))));
+ assertPlan(planTester.getFragmentPlan(1), limitKRanking(tableScan));
+ assertPlan(planTester.getFragmentPlan(2), limitKRanking(tableScan));
+ assertPlan(planTester.getFragmentPlan(3), limitKRanking(mergeSort(exchange(), exchange())));
+ assertPlan(planTester.getFragmentPlan(4), tableScan);
+ assertPlan(planTester.getFragmentPlan(5), tableScan);
+ }
+
+ @Test
+ public void testLimitKRankingPushDownFilterOrderByTimeDesc() {
+ PlanTester planTester = new PlanTester();
+
+ // ORDER BY time DESC triggers LimitKRankingNode instead of TopKRankingNode
+ String sql =
+ "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER BY time DESC) as rn FROM table1) WHERE rn <= 3";
+ LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+ PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+ /*
+ * └──OutputNode
+ * └──LimitKRankingNode
+ * └──SortNode
+ * └──TableScanNode
+ */
+ assertPlan(logicalQueryPlan, output(limitKRanking(sort(tableScan))));
+
+ /* OutputNode
+ * └──LimitKRankingNode
+ * └──CollectNode
+ * ├──ExchangeNode
+ * │ └──TableScan
+ * ├──ExchangeNode
+ * │ └──TableScan
+ * └──ExchangeNode
+ * └──TableScan
+ */
+ assertPlan(
+ planTester.getFragmentPlan(0),
+ output(limitKRanking(collect(exchange(), exchange(), exchange()))));
+ assertPlan(planTester.getFragmentPlan(1), tableScan);
+ assertPlan(planTester.getFragmentPlan(2), tableScan);
+ assertPlan(planTester.getFragmentPlan(3), tableScan);
+ }
+
+ @Test
+ public void testLimitKRankingPushDownFilterOrderByTimeDescWithAllTags() {
+ PlanTester planTester = new PlanTester();
+
+ // All tag columns in PARTITION BY + ORDER BY time DESC → LimitKRankingNode with GroupNode push
+ // down
+ String sql =
+ "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1, tag2, tag3 ORDER BY time DESC) as rn FROM table1) WHERE rn <= 2";
+ LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+ PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+ /*
+ * └──OutputNode
+ * └──LimitKRankingNode
+ * └──GroupNode
+ * └──TableScanNode
+ */
+ assertPlan(logicalQueryPlan, output(limitKRanking(group(tableScan))));
+
+ /* OutputNode
+ * └──CollectNode
+ * ├──ExchangeNode
+ * │ └──LimitKRankingNode
+ * │ └──TableScan
+ * ├──ExchangeNode
+ * │ └──LimitKRankingNode
+ * │ └──TableScan
+ * └──ExchangeNode
+ * └──LimitKRankingNode
+ * └──SortNode
+ * └──TableScan
+ */
+ assertPlan(
+ planTester.getFragmentPlan(0), output((collect(exchange(), exchange(), exchange()))));
+ assertPlan(planTester.getFragmentPlan(1), limitKRanking(tableScan));
+ assertPlan(planTester.getFragmentPlan(2), limitKRanking(tableScan));
+ assertPlan(planTester.getFragmentPlan(3), limitKRanking(mergeSort(exchange(), exchange())));
+ assertPlan(planTester.getFragmentPlan(4), tableScan);
+ assertPlan(planTester.getFragmentPlan(5), tableScan);
+ }
+
+ @Test
+ public void testLimitKRankingPushDownLimitOrderByTimeDesc() {
+ PlanTester planTester = new PlanTester();
+
+ // LIMIT pushdown + ORDER BY time DESC → LimitKRankingNode
+ String sql =
+ "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER BY time DESC) as rn FROM table1) LIMIT 5";
+ LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+ PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+ /*
+ * └──OutputNode
+ * └──LimitNode
+ * └──LimitKRankingNode
+ * └──SortNode
+ * └──TableScanNode
+ */
+ assertPlan(logicalQueryPlan, output(limit(5, limitKRanking(sort(tableScan)))));
+
+ /* OutputNode
+ * └──LimitNode
+ * └──LimitKRankingNode
+ * └──CollectNode
+ * ├──ExchangeNode
+ * │ └──TableScan
+ * ├──ExchangeNode
+ * │ └──TableScan
+ * └──ExchangeNode
+ * └──TableScan
+ */
+ assertPlan(
+ planTester.getFragmentPlan(0),
+ output(limit(5, limitKRanking(collect(exchange(), exchange(), exchange())))));
+ assertPlan(planTester.getFragmentPlan(1), tableScan);
+ assertPlan(planTester.getFragmentPlan(2), tableScan);
+ assertPlan(planTester.getFragmentPlan(3), tableScan);
+ }
+
+ @Test
+ public void testLimitKRankingPushDownLimitOrderByTime() {
+ PlanTester planTester = new PlanTester();
+
+ // LIMIT pushdown + ORDER BY time → LimitKRankingNode
+ String sql =
+ "SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY tag1 ORDER BY time) as rn FROM table1) LIMIT 5";
+ LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql);
+ PlanMatchPattern tableScan = tableScan("testdb.table1");
+
+ /*
+ * └──OutputNode
+ * └──LimitNode
+ * └──LimitKRankingNode
+ * └──SortNode
+ * └──TableScanNode
+ */
+ assertPlan(logicalQueryPlan, output(limit(5, limitKRanking(sort(tableScan)))));
+
+ /* OutputNode
+ * └──LimitNode
+ * └──LimitKRankingNode
+ * └──CollectNode
+ * ├──ExchangeNode
+ * │ └──TableScan
+ * ├──ExchangeNode
+ * │ └──TableScan
+ * └──ExchangeNode
+ * └──TableScan
+ */
+ assertPlan(
+ planTester.getFragmentPlan(0),
+ output(limit(5, limitKRanking(collect(exchange(), exchange(), exchange())))));
+ assertPlan(planTester.getFragmentPlan(1), tableScan);
+ assertPlan(planTester.getFragmentPlan(2), tableScan);
+ assertPlan(planTester.getFragmentPlan(3), tableScan);
}
}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
index 03f79fd2ec29a..f2b73d744599c 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
@@ -40,6 +40,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitKRankingNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
@@ -477,6 +478,10 @@ public static PlanMatchPattern topKRanking(PlanMatchPattern source) {
return node(TopKRankingNode.class, source);
}
+ public static PlanMatchPattern limitKRanking(PlanMatchPattern source) {
+ return node(LimitKRankingNode.class, source);
+ }
+
public static PlanMatchPattern rowNumber(PlanMatchPattern source) {
return node(RowNumberNode.class, source);
}