regionProgress = new LinkedHashMap<>(size);
+ for (int i = 0; i < size; i++) {
+ regionProgress.put(ReadWriteIOUtils.readString(buffer), RegionProgress.deserialize(buffer));
+ }
+ return new TopicProgress(regionProgress);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof TopicProgress)) {
+ return false;
+ }
+ final TopicProgress that = (TopicProgress) obj;
+ return Objects.equals(regionProgress, that.regionProgress);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(regionProgress);
+ }
+
+ @Override
+ public String toString() {
+ return "TopicProgress{" + "regionProgress=" + regionProgress + '}';
+ }
+}
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/WatermarkPayload.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/WatermarkPayload.java
new file mode 100644
index 0000000000000..32dab88967497
--- /dev/null
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/WatermarkPayload.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.poll;
+
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Payload for {@link SubscriptionPollResponseType#WATERMARK}.
+ *
+ * Periodically injected by the server-side {@code ConsensusPrefetchingQueue} to report timestamp
+ * progress for a region. Carries the maximum data timestamp observed so far, enabling client-side
+ * {@code WatermarkProcessor} to advance its watermark even when a region is idle (no new data).
+ *
+ *
The {@code dataNodeId} identifies which DataNode emitted this watermark, allowing the client
+ * to track per-node progress across leader transitions.
+ */
+public class WatermarkPayload implements SubscriptionPollPayload {
+
+ /** Maximum data timestamp observed across all InsertNodes in this region's queue. */
+ private transient long watermarkTimestamp;
+
+ /** The DataNode ID that emitted this watermark. */
+ private transient int dataNodeId;
+
+ public WatermarkPayload() {}
+
+ public WatermarkPayload(final long watermarkTimestamp, final int dataNodeId) {
+ this.watermarkTimestamp = watermarkTimestamp;
+ this.dataNodeId = dataNodeId;
+ }
+
+ public long getWatermarkTimestamp() {
+ return watermarkTimestamp;
+ }
+
+ public int getDataNodeId() {
+ return dataNodeId;
+ }
+
+ @Override
+ public void serialize(final DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(watermarkTimestamp, stream);
+ ReadWriteIOUtils.write(dataNodeId, stream);
+ }
+
+ @Override
+ public SubscriptionPollPayload deserialize(final ByteBuffer buffer) {
+ watermarkTimestamp = ReadWriteIOUtils.readLong(buffer);
+ dataNodeId = ReadWriteIOUtils.readInt(buffer);
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "WatermarkPayload{watermarkTimestamp="
+ + watermarkTimestamp
+ + ", dataNodeId="
+ + dataNodeId
+ + '}';
+ }
+}
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/WriterId.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/WriterId.java
new file mode 100644
index 0000000000000..ce21e07fe008d
--- /dev/null
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/WriterId.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.poll;
+
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class WriterId {
+
+ private final String regionId;
+ private final int nodeId;
+ private final long writerEpoch;
+
+ public WriterId(final String regionId, final int nodeId, final long writerEpoch) {
+ this.regionId = regionId;
+ this.nodeId = nodeId;
+ this.writerEpoch = writerEpoch;
+ }
+
+ public String getRegionId() {
+ return regionId;
+ }
+
+ public int getNodeId() {
+ return nodeId;
+ }
+
+ public long getWriterEpoch() {
+ return writerEpoch;
+ }
+
+ public void serialize(final DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(regionId, stream);
+ ReadWriteIOUtils.write(nodeId, stream);
+ ReadWriteIOUtils.write(writerEpoch, stream);
+ }
+
+ public static WriterId deserialize(final ByteBuffer buffer) {
+ return new WriterId(
+ ReadWriteIOUtils.readString(buffer),
+ ReadWriteIOUtils.readInt(buffer),
+ ReadWriteIOUtils.readLong(buffer));
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof WriterId)) {
+ return false;
+ }
+ final WriterId that = (WriterId) obj;
+ return nodeId == that.nodeId
+ && writerEpoch == that.writerEpoch
+ && Objects.equals(regionId, that.regionId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(regionId, nodeId, writerEpoch);
+ }
+
+ @Override
+ public String toString() {
+ return "WriterId{"
+ + "regionId='"
+ + regionId
+ + '\''
+ + ", nodeId="
+ + nodeId
+ + ", writerEpoch="
+ + writerEpoch
+ + '}';
+ }
+}
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/WriterProgress.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/WriterProgress.java
new file mode 100644
index 0000000000000..f38ea770e8ff6
--- /dev/null
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/WriterProgress.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.poll;
+
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class WriterProgress {
+
+ private final long physicalTime;
+ private final long localSeq;
+
+ public WriterProgress(final long physicalTime, final long localSeq) {
+ this.physicalTime = physicalTime;
+ this.localSeq = localSeq;
+ }
+
+ public long getPhysicalTime() {
+ return physicalTime;
+ }
+
+ public long getLocalSeq() {
+ return localSeq;
+ }
+
+ public void serialize(final DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(physicalTime, stream);
+ ReadWriteIOUtils.write(localSeq, stream);
+ }
+
+ public static WriterProgress deserialize(final ByteBuffer buffer) {
+ return new WriterProgress(ReadWriteIOUtils.readLong(buffer), ReadWriteIOUtils.readLong(buffer));
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof WriterProgress)) {
+ return false;
+ }
+ final WriterProgress that = (WriterProgress) obj;
+ return physicalTime == that.physicalTime && localSeq == that.localSeq;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(physicalTime, localSeq);
+ }
+
+ @Override
+ public String toString() {
+ return "WriterProgress{" + "physicalTime=" + physicalTime + ", localSeq=" + localSeq + '}';
+ }
+}
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestType.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestType.java
index d649aa567ade4..9fcc1d86b0c75 100644
--- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestType.java
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeRequestType.java
@@ -31,6 +31,7 @@ public enum PipeSubscribeRequestType {
CLOSE((short) 4),
SUBSCRIBE((short) 5),
UNSUBSCRIBE((short) 6),
+ SEEK((short) 7),
;
private final short type;
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeSeekReq.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeSeekReq.java
new file mode 100644
index 0000000000000..895417537d5d1
--- /dev/null
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeSeekReq.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.rpc.subscription.payload.poll.TopicProgress;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Objects;
+
+public class PipeSubscribeSeekReq extends TPipeSubscribeReq {
+
+ /** Seek type constants. */
+ public static final short SEEK_TO_BEGINNING = 1;
+
+ public static final short SEEK_TO_END = 2;
+ public static final short SEEK_TO_TIMESTAMP = 3;
+ public static final short SEEK_TO_TOPIC_PROGRESS = 6;
+ public static final short SEEK_AFTER_TOPIC_PROGRESS = 7;
+
+ private transient String topicName;
+ private transient short seekType;
+ private transient long timestamp; // only meaningful when seekType == SEEK_TO_TIMESTAMP
+ private transient TopicProgress topicProgress = new TopicProgress(Collections.emptyMap());
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public short getSeekType() {
+ return seekType;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public TopicProgress getTopicProgress() {
+ return topicProgress;
+ }
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into {@code PipeSubscribeSeekReq}, called by the subscription
+ * client.
+ */
+ public static PipeSubscribeSeekReq toTPipeSubscribeReq(
+ final String topicName, final short seekType, final long timestamp) throws IOException {
+ return toTPipeSubscribeReq(topicName, seekType, timestamp, null);
+ }
+
+ public static PipeSubscribeSeekReq toTPipeSubscribeReq(
+ final String topicName, final TopicProgress topicProgress) throws IOException {
+ return toTPipeSubscribeReq(topicName, SEEK_TO_TOPIC_PROGRESS, 0, topicProgress);
+ }
+
+ public static PipeSubscribeSeekReq toTPipeSubscribeSeekAfterReq(
+ final String topicName, final TopicProgress topicProgress) throws IOException {
+ return toTPipeSubscribeReq(topicName, SEEK_AFTER_TOPIC_PROGRESS, 0, topicProgress);
+ }
+
+ public static PipeSubscribeSeekReq toTPipeSubscribeReq(
+ final String topicName,
+ final short seekType,
+ final long timestamp,
+ final TopicProgress topicProgress)
+ throws IOException {
+ final PipeSubscribeSeekReq req = new PipeSubscribeSeekReq();
+
+ req.topicName = topicName;
+ req.seekType = seekType;
+ req.timestamp = timestamp;
+ req.topicProgress =
+ Objects.nonNull(topicProgress) ? topicProgress : new TopicProgress(Collections.emptyMap());
+
+ req.version = PipeSubscribeRequestVersion.VERSION_1.getVersion();
+ req.type = PipeSubscribeRequestType.SEEK.getType();
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(topicName, outputStream);
+ ReadWriteIOUtils.write(seekType, outputStream);
+ if (seekType == SEEK_TO_TIMESTAMP) {
+ ReadWriteIOUtils.write(timestamp, outputStream);
+ } else if (seekType == SEEK_TO_TOPIC_PROGRESS || seekType == SEEK_AFTER_TOPIC_PROGRESS) {
+ req.topicProgress.serialize(outputStream);
+ }
+ req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
+ return req;
+ }
+
+ /**
+ * Deserialize {@code TPipeSubscribeReq} to obtain parameters, called by the subscription server.
+ */
+ public static PipeSubscribeSeekReq fromTPipeSubscribeReq(final TPipeSubscribeReq seekReq) {
+ final PipeSubscribeSeekReq req = new PipeSubscribeSeekReq();
+
+ if (Objects.nonNull(seekReq.body) && seekReq.body.hasRemaining()) {
+ req.topicName = ReadWriteIOUtils.readString(seekReq.body);
+ req.seekType = ReadWriteIOUtils.readShort(seekReq.body);
+ if (req.seekType == SEEK_TO_TIMESTAMP) {
+ req.timestamp = ReadWriteIOUtils.readLong(seekReq.body);
+ } else if (req.seekType == SEEK_TO_TOPIC_PROGRESS
+ || req.seekType == SEEK_AFTER_TOPIC_PROGRESS) {
+ req.topicProgress = TopicProgress.deserialize(seekReq.body);
+ }
+ }
+
+ req.version = seekReq.version;
+ req.type = seekReq.type;
+
+ return req;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final PipeSubscribeSeekReq that = (PipeSubscribeSeekReq) obj;
+ return Objects.equals(this.topicName, that.topicName)
+ && this.seekType == that.seekType
+ && this.timestamp == that.timestamp
+ && Objects.equals(this.topicProgress, that.topicProgress)
+ && this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicName, seekType, timestamp, topicProgress, version, type, body);
+ }
+}
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSeekResp.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSeekResp.java
new file mode 100644
index 0000000000000..c6ea90d5bb069
--- /dev/null
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeSeekResp.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.response;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
+
+import java.util.Objects;
+
+public class PipeSubscribeSeekResp extends TPipeSubscribeResp {
+
+ /////////////////////////////// Thrift ///////////////////////////////
+
+ /**
+ * Serialize the incoming parameters into {@code PipeSubscribeSeekResp}, called by the
+ * subscription server.
+ */
+ public static PipeSubscribeSeekResp toTPipeSubscribeResp(final TSStatus status) {
+ final PipeSubscribeSeekResp resp = new PipeSubscribeSeekResp();
+
+ resp.status = status;
+ resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
+ resp.type = PipeSubscribeResponseType.ACK.getType();
+
+ return resp;
+ }
+
+ /**
+ * Deserialize {@code TPipeSubscribeResp} to obtain parameters, called by the subscription client.
+ */
+ public static PipeSubscribeSeekResp fromTPipeSubscribeResp(final TPipeSubscribeResp seekResp) {
+ final PipeSubscribeSeekResp resp = new PipeSubscribeSeekResp();
+
+ resp.status = seekResp.status;
+ resp.version = seekResp.version;
+ resp.type = seekResp.type;
+ resp.body = seekResp.body;
+
+ return resp;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final PipeSubscribeSeekResp that = (PipeSubscribeSeekResp) obj;
+ return Objects.equals(this.status, that.status)
+ && this.version == that.version
+ && this.type == that.type
+ && Objects.equals(this.body, that.body);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(status, version, type, body);
+ }
+}
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTablePullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTablePullConsumer.java
index 0168a1ba3846d..abc5e2de2ff92 100644
--- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTablePullConsumer.java
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTablePullConsumer.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.session.subscription.consumer;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
+import org.apache.iotdb.rpc.subscription.payload.poll.TopicProgress;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import java.time.Duration;
@@ -179,6 +180,19 @@ List poll(final Set topicNames, final Duration time
void commitAsync(
final Iterable messages, final AsyncCommitCallback callback);
+ void seekToBeginning(final String topicName) throws SubscriptionException;
+
+ void seekToEnd(final String topicName) throws SubscriptionException;
+
+ TopicProgress positions(final String topicName) throws SubscriptionException;
+
+ TopicProgress committedPositions(final String topicName) throws SubscriptionException;
+
+ void seek(final String topicName, final TopicProgress topicProgress) throws SubscriptionException;
+
+ void seekAfter(final String topicName, final TopicProgress topicProgress)
+ throws SubscriptionException;
+
/**
* Retrieves the unique identifier of this consumer. If no consumer ID was provided at the time of
* consumer construction, a random globally unique ID is automatically assigned after the consumer
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTreePullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTreePullConsumer.java
index 803b7c51224a4..fc9d55bfe218a 100644
--- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTreePullConsumer.java
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/ISubscriptionTreePullConsumer.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.session.subscription.consumer;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
+import org.apache.iotdb.rpc.subscription.payload.poll.TopicProgress;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import java.time.Duration;
@@ -179,6 +180,19 @@ List poll(final Set topicNames, final Duration time
void commitAsync(
final Iterable messages, final AsyncCommitCallback callback);
+ void seekToBeginning(final String topicName) throws SubscriptionException;
+
+ void seekToEnd(final String topicName) throws SubscriptionException;
+
+ TopicProgress positions(final String topicName) throws SubscriptionException;
+
+ TopicProgress committedPositions(final String topicName) throws SubscriptionException;
+
+ void seek(final String topicName, final TopicProgress topicProgress) throws SubscriptionException;
+
+ void seekAfter(final String topicName, final TopicProgress topicProgress)
+ throws SubscriptionException;
+
/**
* Retrieves the unique identifier of this consumer. If no consumer ID was provided at the time of
* consumer construction, a random globally unique ID is automatically assigned after the consumer
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
index e9fbb1672e563..506b678231340 100644
--- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java
@@ -34,11 +34,17 @@
import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload;
import org.apache.iotdb.rpc.subscription.payload.poll.FileSealPayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.RegionProgress;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.TopicProgress;
+import org.apache.iotdb.rpc.subscription.payload.poll.WatermarkPayload;
+import org.apache.iotdb.rpc.subscription.payload.poll.WriterId;
+import org.apache.iotdb.rpc.subscription.payload.poll.WriterProgress;
+import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeSeekReq;
import org.apache.iotdb.session.subscription.consumer.AsyncCommitCallback;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
@@ -77,6 +83,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
@@ -88,6 +95,7 @@
import static org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType.FILE_INIT;
import static org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType.TABLETS;
import static org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType.TERMINATION;
+import static org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType.WATERMARK;
import static org.apache.iotdb.session.subscription.util.SetPartitioner.partition;
abstract class AbstractSubscriptionConsumer implements AutoCloseable {
@@ -121,6 +129,26 @@ abstract class AbstractSubscriptionConsumer implements AutoCloseable {
private final int connectionTimeoutInMs;
private final int maxPollParallelism;
+ /**
+ * The latest watermark timestamp received from the server. Updated when WATERMARK events are
+ * processed and stripped. Consumer users can query this to check timestamp progress.
+ */
+ protected volatile long latestWatermarkTimestamp = Long.MIN_VALUE;
+
+ /** Per-topic current positions used as the consumer-guided positioning hint in poll requests. */
+ private final Map currentPositionsByTopic = new ConcurrentHashMap<>();
+
+ /** Per-topic committed positions used as durable recovery points for explicit seek/checkpoint. */
+ private final Map committedPositionsByTopic = new ConcurrentHashMap<>();
+
+ /**
+ * Ack contexts for consensus messages that were already processed locally but could not be
+ * committed because the original provider became unavailable. They are flushed after the same
+ * topic+region is observed again from a live provider.
+ */
+ private final Map> pendingRedirectAcksByTopicRegion =
+ new ConcurrentHashMap<>();
+
@SuppressWarnings("java:S3077")
protected volatile Map subscribedTopics = new HashMap<>();
@@ -376,6 +404,106 @@ private void unsubscribe(Set topicNames, final boolean needParse)
providers.acquireReadLock();
try {
unsubscribeWithRedirection(topicNames);
+ topicNames.forEach(this::clearPendingRedirectAcks);
+ } finally {
+ providers.releaseReadLock();
+ }
+ }
+
+ /////////////////////////////// seek ///////////////////////////////
+
+ /**
+ * Seeks to the earliest available WAL position. Actual position depends on WAL retention — old
+ * segments may have been reclaimed.
+ */
+ public void seekToBeginning(final String topicName) throws SubscriptionException {
+ checkIfOpened();
+ seekInternal(topicName, PipeSubscribeSeekReq.SEEK_TO_BEGINNING, 0);
+ clearCurrentPositions(topicName);
+ clearCommittedPositions(topicName);
+ clearPendingRedirectAcks(topicName);
+ }
+
+ /** Seeks to the current WAL tail. Only newly written data will be consumed after this. */
+ public void seekToEnd(final String topicName) throws SubscriptionException {
+ checkIfOpened();
+ seekInternal(topicName, PipeSubscribeSeekReq.SEEK_TO_END, 0);
+ clearCurrentPositions(topicName);
+ clearCommittedPositions(topicName);
+ clearPendingRedirectAcks(topicName);
+ }
+
+ /**
+ * Returns the latest observed per-region positions for the given topic. This is the consumer's
+ * current fetch position hint and is sent back to the server on subsequent poll requests.
+ */
+ public TopicProgress positions(final String topicName) throws SubscriptionException {
+ checkIfOpened();
+ final TopicProgress progress = currentPositionsByTopic.get(topicName);
+ return Objects.nonNull(progress)
+ ? new TopicProgress(progress.getRegionProgress())
+ : new TopicProgress(Collections.emptyMap());
+ }
+
+ /**
+ * Returns the latest committed per-region positions for the given topic. This is the recoverable
+ * checkpoint position that should be persisted by callers.
+ */
+ public TopicProgress committedPositions(final String topicName) throws SubscriptionException {
+ checkIfOpened();
+ final TopicProgress progress = committedPositionsByTopic.get(topicName);
+ return Objects.nonNull(progress)
+ ? new TopicProgress(progress.getRegionProgress())
+ : new TopicProgress(Collections.emptyMap());
+ }
+
+ public void seek(final String topicName, final TopicProgress topicProgress)
+ throws SubscriptionException {
+ checkIfOpened();
+ final TopicProgress safeProgress =
+ Objects.nonNull(topicProgress) ? topicProgress : new TopicProgress(Collections.emptyMap());
+ seekInternalTopicProgress(topicName, safeProgress);
+ overlayCurrentPositions(topicName, safeProgress);
+ overlayCommittedPositions(topicName, safeProgress);
+ clearPendingRedirectAcks(topicName);
+ }
+
+ public void seekAfter(final String topicName, final TopicProgress topicProgress)
+ throws SubscriptionException {
+ checkIfOpened();
+ final TopicProgress safeProgress =
+ Objects.nonNull(topicProgress) ? topicProgress : new TopicProgress(Collections.emptyMap());
+ seekAfterInternalTopicProgress(topicName, safeProgress);
+ overlayCurrentPositions(topicName, safeProgress);
+ overlayCommittedPositions(topicName, safeProgress);
+ clearPendingRedirectAcks(topicName);
+ }
+
+ private void seekInternal(final String topicName, final short seekType, final long timestamp)
+ throws SubscriptionException {
+ providers.acquireReadLock();
+ try {
+ seekWithRedirection(topicName, seekType, timestamp);
+ } finally {
+ providers.releaseReadLock();
+ }
+ }
+
+ private void seekInternalTopicProgress(final String topicName, final TopicProgress topicProgress)
+ throws SubscriptionException {
+ providers.acquireReadLock();
+ try {
+ seekWithRedirectionTopicProgress(topicName, topicProgress);
+ } finally {
+ providers.releaseReadLock();
+ }
+ }
+
+ private void seekAfterInternalTopicProgress(
+ final String topicName, final TopicProgress topicProgress) throws SubscriptionException {
+ providers.acquireReadLock();
+ try {
+ seekAfterWithRedirectionTopicProgress(topicName, topicProgress);
} finally {
providers.releaseReadLock();
}
@@ -522,9 +650,44 @@ private Path getFilePath(
unsubscribe(Collections.singleton(topicNameToUnsubscribe), false);
return Optional.empty();
});
+ put(
+ WATERMARK,
+ (resp, timer) -> {
+ final SubscriptionCommitContext commitContext = resp.getCommitContext();
+ final WatermarkPayload payload = (WatermarkPayload) resp.getPayload();
+ return Optional.of(
+ new SubscriptionMessage(
+ commitContext, payload.getWatermarkTimestamp()));
+ });
}
});
+ /**
+ * Returns the set of DataNode IDs for providers that are currently available. Used by subclasses
+ * to detect unavailable DataNodes and notify the progress ordering processor.
+ */
+ protected Set getAvailableDataNodeIds() {
+ providers.acquireReadLock();
+ try {
+ final Set ids = new HashSet<>();
+ for (final AbstractSubscriptionProvider provider : providers.getAllAvailableProviders()) {
+ ids.add(provider.getDataNodeId());
+ }
+ return ids;
+ } finally {
+ providers.releaseReadLock();
+ }
+ }
+
+ /**
+ * Returns the latest watermark timestamp received from the server. This tracks the maximum data
+ * timestamp observed across all polled regions. Returns {@code Long.MIN_VALUE} if no watermark
+ * has been received yet.
+ */
+ public long getLatestWatermarkTimestamp() {
+ return latestWatermarkTimestamp;
+ }
+
protected List multiplePoll(
/* @NotNull */ final Set topicNames, final long timeoutMs) {
if (topicNames.isEmpty()) {
@@ -685,6 +848,8 @@ private List singlePoll(
// add all current messages to result messages
messages.addAll(currentMessages);
+ advanceCurrentPositions(currentMessages);
+ flushPendingRedirectAcks(currentMessages);
// TODO: maybe we can poll a few more times
if (!messages.isEmpty()) {
@@ -1079,7 +1244,7 @@ private List pollInternal(
}
// ignore SubscriptionConnectionException to improve poll auto retry
try {
- return provider.poll(topicNames, timeoutMs);
+ return provider.poll(topicNames, timeoutMs, buildCurrentProgressByTopic(topicNames));
} catch (final SubscriptionConnectionException ignored) {
return Collections.emptyList();
}
@@ -1174,7 +1339,80 @@ private void commit(final Iterable commitContexts, fi
for (final Entry> entry :
dataNodeIdToSubscriptionCommitContexts.entrySet()) {
commitInternal(entry.getKey(), entry.getValue(), nack);
+ if (!nack) {
+ advanceCommittedPositions(entry.getValue());
+ }
+ }
+ }
+
+ protected Set ackWithPartialProgress(
+ final Iterable messages) throws SubscriptionException {
+ final List bufferedMessages = new ArrayList<>();
+ final List commitContexts = new ArrayList<>();
+ for (final SubscriptionMessage message : messages) {
+ bufferedMessages.add(message);
+ commitContexts.add(message.getCommitContext());
+ }
+
+ final Set removableCommitContexts =
+ ackCommitContextsWithPartialProgress(commitContexts);
+ final Set removableMessages = new HashSet<>();
+ for (final SubscriptionMessage message : bufferedMessages) {
+ if (removableCommitContexts.contains(message.getCommitContext())) {
+ removableMessages.add(message);
+ }
}
+ return removableMessages;
+ }
+
+ protected Set ackCommitContextsWithPartialProgress(
+ final Iterable commitContexts) throws SubscriptionException {
+ final Map> dataNodeIdToCommitContexts =
+ new HashMap<>();
+ for (final SubscriptionCommitContext commitContext : commitContexts) {
+ dataNodeIdToCommitContexts
+ .computeIfAbsent(commitContext.getDataNodeId(), ignored -> new ArrayList<>())
+ .add(commitContext);
+ }
+
+ final Set removableCommitContexts = new HashSet<>();
+ for (final Entry> entry :
+ dataNodeIdToCommitContexts.entrySet()) {
+ final List groupedCommitContexts = entry.getValue();
+ try {
+ commitInternal(entry.getKey(), groupedCommitContexts, false);
+ advanceCommittedPositions(groupedCommitContexts);
+ removableCommitContexts.addAll(groupedCommitContexts);
+ } catch (final SubscriptionConnectionException e) {
+ int stagedCount = 0;
+ int retainedCount = 0;
+ for (final SubscriptionCommitContext commitContext : groupedCommitContexts) {
+ if (isConsensusCommitContext(commitContext)) {
+ stagePendingRedirectAck(commitContext);
+ removableCommitContexts.add(commitContext);
+ stagedCount++;
+ } else {
+ retainedCount++;
+ }
+ }
+ if (stagedCount > 0) {
+ LOGGER.warn(
+ "{} staged {} consensus ack(s) for redirect after provider {} became unavailable",
+ this,
+ stagedCount,
+ entry.getKey());
+ }
+ if (retainedCount > 0) {
+ LOGGER.warn(
+ "{} keep {} non-consensus ack(s) pending after provider {} commit failure",
+ this,
+ retainedCount,
+ entry.getKey(),
+ e);
+ }
+ }
+ }
+ return removableCommitContexts;
}
protected void nack(final Iterable messages) throws SubscriptionException {
@@ -1390,6 +1628,365 @@ private void unsubscribeWithRedirection(final Set topicNames)
throw new SubscriptionRuntimeCriticalException(errorMessage);
}
+ /**
+ * Sends seek request to ALL available providers. Unlike subscribe/unsubscribe, seek is only
+ * considered successful if every available provider acknowledges it because data regions for the
+ * topic may be distributed across different nodes.
+ */
+ private void seekWithRedirection(
+ final String topicName, final short seekType, final long timestamp)
+ throws SubscriptionException {
+ final List providers = this.providers.getAllAvailableProviders();
+ if (providers.isEmpty()) {
+ throw new SubscriptionConnectionException(
+ String.format(
+ "Cluster has no available subscription providers when %s seek topic %s",
+ this, topicName));
+ }
+ final List failedProviders = new ArrayList<>();
+ Throwable firstFailure = null;
+ for (final AbstractSubscriptionProvider provider : providers) {
+ try {
+ provider.seek(topicName, seekType, timestamp);
+ } catch (final Exception e) {
+ failedProviders.add(provider);
+ if (Objects.isNull(firstFailure)) {
+ firstFailure = e;
+ }
+ LOGGER.warn(
+ "{} failed to seek topic {} from subscription provider {}; seek requires every provider to succeed, so the client will continue notifying the remaining providers before failing this seek.",
+ this,
+ topicName,
+ provider,
+ e);
+ }
+ }
+ if (!failedProviders.isEmpty()) {
+ final String errorMessage =
+ String.format(
+ "%s failed to seek topic %s on subscription providers %s; seek requires every available provider to succeed",
+ this, topicName, failedProviders);
+ LOGGER.warn(errorMessage);
+ throw new SubscriptionRuntimeCriticalException(errorMessage, firstFailure);
+ }
+ }
+
+ /** Same all-provider success requirement as {@link #seekWithRedirection(String, short, long)}. */
+ private void seekWithRedirectionTopicProgress(
+ final String topicName, final TopicProgress topicProgress) throws SubscriptionException {
+ final List providers = this.providers.getAllAvailableProviders();
+ if (providers.isEmpty()) {
+ throw new SubscriptionConnectionException(
+ String.format(
+ "Cluster has no available subscription providers when %s seek topic %s",
+ this, topicName));
+ }
+ final List failedProviders = new ArrayList<>();
+ Throwable firstFailure = null;
+ for (final AbstractSubscriptionProvider provider : providers) {
+ try {
+ provider.seekToTopicProgress(topicName, topicProgress);
+ } catch (final Exception e) {
+ failedProviders.add(provider);
+ if (Objects.isNull(firstFailure)) {
+ firstFailure = e;
+ }
+ LOGGER.warn(
+ "{} failed to seek topic {} to topicProgress(regionCount={}) from provider {}; seek requires every provider to succeed, so the client will continue notifying the remaining providers before failing this seek.",
+ this,
+ topicName,
+ topicProgress.getRegionProgress().size(),
+ provider,
+ e);
+ }
+ }
+ if (!failedProviders.isEmpty()) {
+ final String errorMessage =
+ String.format(
+ "%s failed to seek topic %s to topicProgress(regionCount=%d) on subscription providers %s; seek requires every available provider to succeed",
+ this, topicName, topicProgress.getRegionProgress().size(), failedProviders);
+ LOGGER.warn(errorMessage);
+ throw new SubscriptionRuntimeCriticalException(errorMessage, firstFailure);
+ }
+ }
+
+ /** Same all-provider success requirement as {@link #seekWithRedirection(String, short, long)}. */
+ private void seekAfterWithRedirectionTopicProgress(
+ final String topicName, final TopicProgress topicProgress) throws SubscriptionException {
+ final List providers = this.providers.getAllAvailableProviders();
+ if (providers.isEmpty()) {
+ throw new SubscriptionConnectionException(
+ String.format(
+ "Cluster has no available subscription providers when %s seekAfter topic %s",
+ this, topicName));
+ }
+ final List failedProviders = new ArrayList<>();
+ Throwable firstFailure = null;
+ for (final AbstractSubscriptionProvider provider : providers) {
+ try {
+ provider.seekAfterTopicProgress(topicName, topicProgress);
+ } catch (final Exception e) {
+ failedProviders.add(provider);
+ if (Objects.isNull(firstFailure)) {
+ firstFailure = e;
+ }
+ LOGGER.warn(
+ "{} failed to seekAfter topic {} to topicProgress(regionCount={}) from provider {}; seek requires every provider to succeed, so the client will continue notifying the remaining providers before failing this seekAfter.",
+ this,
+ topicName,
+ topicProgress.getRegionProgress().size(),
+ provider,
+ e);
+ }
+ }
+ if (!failedProviders.isEmpty()) {
+ final String errorMessage =
+ String.format(
+ "%s failed to seekAfter topic %s to topicProgress(regionCount=%d) on subscription providers %s; seek requires every available provider to succeed",
+ this, topicName, topicProgress.getRegionProgress().size(), failedProviders);
+ LOGGER.warn(errorMessage);
+ throw new SubscriptionRuntimeCriticalException(errorMessage, firstFailure);
+ }
+ }
+
+ private Map buildCurrentProgressByTopic(final Set topicNames) {
+ final Map result = new HashMap<>();
+ for (final String topicName : topicNames) {
+ final TopicProgress topicProgress = currentPositionsByTopic.get(topicName);
+ if (Objects.isNull(topicProgress) || topicProgress.getRegionProgress().isEmpty()) {
+ continue;
+ }
+ result.put(topicName, new TopicProgress(topicProgress.getRegionProgress()));
+ }
+ return result;
+ }
+
+ private void advanceCurrentPositions(final List messages) {
+ for (final SubscriptionMessage message : messages) {
+ final SubscriptionCommitContext commitContext = message.getCommitContext();
+ if (Objects.isNull(commitContext) || Objects.isNull(commitContext.getTopicName())) {
+ continue;
+ }
+ mergeTopicProgress(
+ currentPositionsByTopic,
+ commitContext.getTopicName(),
+ extractWriterId(commitContext),
+ extractWriterProgress(commitContext));
+ }
+ }
+
+ private void advanceCommittedPositions(
+ final List subscriptionCommitContexts) {
+ for (final SubscriptionCommitContext commitContext : subscriptionCommitContexts) {
+ if (Objects.isNull(commitContext) || Objects.isNull(commitContext.getTopicName())) {
+ continue;
+ }
+ mergeTopicProgress(
+ committedPositionsByTopic,
+ commitContext.getTopicName(),
+ extractWriterId(commitContext),
+ extractWriterProgress(commitContext));
+ }
+ }
+
+ private boolean isConsensusCommitContext(final SubscriptionCommitContext commitContext) {
+ return Objects.nonNull(commitContext)
+ && Objects.nonNull(commitContext.getWriterId())
+ && Objects.nonNull(commitContext.getWriterProgress())
+ && Objects.nonNull(commitContext.getRegionId())
+ && !commitContext.getRegionId().isEmpty();
+ }
+
+ private String buildTopicRegionKey(final SubscriptionCommitContext commitContext) {
+ return commitContext.getTopicName() + '\u0001' + commitContext.getRegionId();
+ }
+
+ private void stagePendingRedirectAck(final SubscriptionCommitContext commitContext) {
+ pendingRedirectAcksByTopicRegion
+ .computeIfAbsent(
+ buildTopicRegionKey(commitContext), ignored -> ConcurrentHashMap.newKeySet())
+ .add(commitContext);
+ }
+
+ private void flushPendingRedirectAcks(final List currentMessages) {
+ final Map redirectTargetByTopicRegion = new HashMap<>();
+ for (final SubscriptionMessage message : currentMessages) {
+ final SubscriptionCommitContext commitContext = message.getCommitContext();
+ if (!isConsensusCommitContext(commitContext)) {
+ continue;
+ }
+ redirectTargetByTopicRegion.put(
+ buildTopicRegionKey(commitContext), commitContext.getDataNodeId());
+ }
+
+ for (final Entry entry : redirectTargetByTopicRegion.entrySet()) {
+ final Set pendingContexts =
+ pendingRedirectAcksByTopicRegion.get(entry.getKey());
+ if (Objects.isNull(pendingContexts) || pendingContexts.isEmpty()) {
+ continue;
+ }
+
+ final List contextsToRedirect = new ArrayList<>(pendingContexts);
+ try {
+ commitInternal(entry.getValue(), contextsToRedirect, false);
+ advanceCommittedPositions(contextsToRedirect);
+ contextsToRedirect.forEach(pendingContexts::remove);
+ if (pendingContexts.isEmpty()) {
+ pendingRedirectAcksByTopicRegion.remove(entry.getKey(), pendingContexts);
+ }
+ } catch (final SubscriptionException e) {
+ LOGGER.warn(
+ "{} failed to redirect {} pending consensus ack(s) for {} via provider {}",
+ this,
+ contextsToRedirect.size(),
+ entry.getKey(),
+ entry.getValue(),
+ e);
+ }
+ }
+ }
+
+ private boolean isNewerWriterProgress(
+ final long newPhysicalTime,
+ final long newLocalSeq,
+ final long oldPhysicalTime,
+ final long oldLocalSeq) {
+ return newPhysicalTime > oldPhysicalTime
+ || (newPhysicalTime == oldPhysicalTime && newLocalSeq > oldLocalSeq);
+ }
+
+ private void clearCurrentPositions(final String topicName) {
+ currentPositionsByTopic.remove(topicName);
+ }
+
+ private void clearCommittedPositions(final String topicName) {
+ committedPositionsByTopic.remove(topicName);
+ }
+
+ private void clearPendingRedirectAcks(final String topicName) {
+ final String prefix = topicName + '\u0001';
+ pendingRedirectAcksByTopicRegion.keySet().removeIf(key -> key.startsWith(prefix));
+ }
+
+ private void setCurrentPositions(final String topicName, final TopicProgress topicProgress) {
+ if (Objects.isNull(topicProgress) || topicProgress.getRegionProgress().isEmpty()) {
+ currentPositionsByTopic.remove(topicName);
+ return;
+ }
+ currentPositionsByTopic.put(topicName, new TopicProgress(topicProgress.getRegionProgress()));
+ }
+
+ private void setCommittedPositions(final String topicName, final TopicProgress topicProgress) {
+ if (Objects.isNull(topicProgress) || topicProgress.getRegionProgress().isEmpty()) {
+ committedPositionsByTopic.remove(topicName);
+ return;
+ }
+ committedPositionsByTopic.put(topicName, new TopicProgress(topicProgress.getRegionProgress()));
+ }
+
+ private void overlayCurrentPositions(final String topicName, final TopicProgress topicProgress) {
+ overlayTopicProgress(currentPositionsByTopic, topicName, topicProgress);
+ }
+
+ private void overlayCommittedPositions(
+ final String topicName, final TopicProgress topicProgress) {
+ overlayTopicProgress(committedPositionsByTopic, topicName, topicProgress);
+ }
+
+ private void overlayTopicProgress(
+ final Map progressByTopic,
+ final String topicName,
+ final TopicProgress topicProgress) {
+ if (Objects.isNull(topicName)
+ || topicName.isEmpty()
+ || Objects.isNull(topicProgress)
+ || topicProgress.getRegionProgress().isEmpty()) {
+ return;
+ }
+ progressByTopic.compute(
+ topicName,
+ (ignored, oldTopicProgress) -> {
+ final Map mergedRegionProgress =
+ Objects.nonNull(oldTopicProgress)
+ ? new HashMap<>(oldTopicProgress.getRegionProgress())
+ : new HashMap<>();
+ topicProgress
+ .getRegionProgress()
+ .forEach(
+ (regionId, regionProgress) -> {
+ if (Objects.isNull(regionId)
+ || regionId.isEmpty()
+ || Objects.isNull(regionProgress)
+ || regionProgress.getWriterPositions().isEmpty()) {
+ return;
+ }
+ mergedRegionProgress.put(
+ regionId,
+ new RegionProgress(new HashMap<>(regionProgress.getWriterPositions())));
+ });
+ return mergedRegionProgress.isEmpty() ? null : new TopicProgress(mergedRegionProgress);
+ });
+ }
+
+ private WriterId extractWriterId(final SubscriptionCommitContext commitContext) {
+ if (Objects.nonNull(commitContext.getWriterId())) {
+ return commitContext.getWriterId();
+ }
+ if (Objects.isNull(commitContext.getRegionId()) || commitContext.getRegionId().isEmpty()) {
+ return null;
+ }
+ return new WriterId(commitContext.getRegionId(), commitContext.getDataNodeId(), 0L);
+ }
+
+ private WriterProgress extractWriterProgress(final SubscriptionCommitContext commitContext) {
+ if (Objects.nonNull(commitContext.getWriterProgress())) {
+ return commitContext.getWriterProgress();
+ }
+ if (commitContext.getLocalSeq() < 0) {
+ return null;
+ }
+ return new WriterProgress(commitContext.getPhysicalTime(), commitContext.getLocalSeq());
+ }
+
+ private void mergeTopicProgress(
+ final Map progressByTopic,
+ final String topicName,
+ final WriterId writerId,
+ final WriterProgress writerProgress) {
+ if (Objects.isNull(writerId)
+ || Objects.isNull(writerProgress)
+ || Objects.isNull(topicName)
+ || topicName.isEmpty()) {
+ return;
+ }
+ progressByTopic.compute(
+ topicName,
+ (key, oldTopicProgress) -> {
+ final Map regionProgressById =
+ Objects.nonNull(oldTopicProgress)
+ ? new HashMap<>(oldTopicProgress.getRegionProgress())
+ : new HashMap<>();
+ final RegionProgress oldRegionProgress = regionProgressById.get(writerId.getRegionId());
+ final Map writerPositions =
+ Objects.nonNull(oldRegionProgress)
+ ? new HashMap<>(oldRegionProgress.getWriterPositions())
+ : new HashMap<>();
+ writerPositions.merge(
+ writerId,
+ writerProgress,
+ (oldVal, newVal) ->
+ isNewerWriterProgress(
+ newVal.getPhysicalTime(),
+ newVal.getLocalSeq(),
+ oldVal.getPhysicalTime(),
+ oldVal.getLocalSeq())
+ ? newVal
+ : oldVal);
+ regionProgressById.put(writerId.getRegionId(), new RegionProgress(writerPositions));
+ return new TopicProgress(regionProgressById);
+ });
+ }
+
Map fetchAllEndPointsWithRedirection() throws SubscriptionException {
final List providers = this.providers.getAllAvailableProviders();
if (providers.isEmpty()) {
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
index 7f3582d195d6a..cfa0390a48300 100644
--- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
@@ -37,11 +37,13 @@
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollRequest;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollRequestType;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
+import org.apache.iotdb.rpc.subscription.payload.poll.TopicProgress;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCloseReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCommitReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHandshakeReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHeartbeatReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribePollReq;
+import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeSeekReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeSubscribeReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeUnsubscribeReq;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHandshakeResp;
@@ -59,6 +61,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -332,14 +335,107 @@ Map unsubscribe(final Set topicNames) throws Subscr
return unsubscribeResp.getTopics();
}
+ void seek(final String topicName, final short seekType, final long timestamp)
+ throws SubscriptionException {
+ final PipeSubscribeSeekReq req;
+ try {
+ req = PipeSubscribeSeekReq.toTPipeSubscribeReq(topicName, seekType, timestamp);
+ } catch (final IOException e) {
+ LOGGER.warn(
+ "IOException occurred when SubscriptionProvider {} serialize seek request for topic {}",
+ this,
+ topicName,
+ e);
+ throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
+ }
+ final TPipeSubscribeResp resp;
+ try {
+ resp = getSessionConnection().pipeSubscribe(req);
+ } catch (final TException | IoTDBConnectionException e) {
+ LOGGER.warn(
+ "TException/IoTDBConnectionException occurred when SubscriptionProvider {} seek with request for topic {}, set SubscriptionProvider unavailable",
+ this,
+ topicName,
+ e);
+ setUnavailable();
+ throw new SubscriptionConnectionException(e.getMessage(), e);
+ }
+ verifyPipeSubscribeSuccess(resp.status);
+ }
+
+ void seekToTopicProgress(final String topicName, final TopicProgress topicProgress)
+ throws SubscriptionException {
+ final PipeSubscribeSeekReq req;
+ try {
+ req = PipeSubscribeSeekReq.toTPipeSubscribeReq(topicName, topicProgress);
+ } catch (final IOException e) {
+ LOGGER.warn(
+ "IOException occurred when SubscriptionProvider {} serialize seek(topicProgress) for topic {}",
+ this,
+ topicName,
+ e);
+ throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
+ }
+ final TPipeSubscribeResp resp;
+ try {
+ resp = getSessionConnection().pipeSubscribe(req);
+ } catch (final TException | IoTDBConnectionException e) {
+ LOGGER.warn(
+ "TException/IoTDBConnectionException occurred when SubscriptionProvider {} seek(topicProgress) for topic {}, set SubscriptionProvider unavailable",
+ this,
+ topicName,
+ e);
+ setUnavailable();
+ throw new SubscriptionConnectionException(e.getMessage(), e);
+ }
+ verifyPipeSubscribeSuccess(resp.status);
+ }
+
+ void seekAfterTopicProgress(final String topicName, final TopicProgress topicProgress)
+ throws SubscriptionException {
+ final PipeSubscribeSeekReq req;
+ try {
+ req = PipeSubscribeSeekReq.toTPipeSubscribeSeekAfterReq(topicName, topicProgress);
+ } catch (final IOException e) {
+ LOGGER.warn(
+ "IOException occurred when SubscriptionProvider {} serialize seekAfter(topicProgress) for topic {}",
+ this,
+ topicName,
+ e);
+ throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
+ }
+ final TPipeSubscribeResp resp;
+ try {
+ resp = getSessionConnection().pipeSubscribe(req);
+ } catch (final TException | IoTDBConnectionException e) {
+ LOGGER.warn(
+ "TException/IoTDBConnectionException occurred when SubscriptionProvider {} seekAfter(topicProgress) for topic {}, set SubscriptionProvider unavailable",
+ this,
+ topicName,
+ e);
+ setUnavailable();
+ throw new SubscriptionConnectionException(e.getMessage(), e);
+ }
+ verifyPipeSubscribeSuccess(resp.status);
+ }
+
List poll(final Set topicNames, final long timeoutMs)
throws SubscriptionException {
+ return poll(topicNames, timeoutMs, Collections.emptyMap());
+ }
+
+ List poll(
+ final Set topicNames,
+ final long timeoutMs,
+ final Map progressByTopic)
+ throws SubscriptionException {
return poll(
new SubscriptionPollRequest(
SubscriptionPollRequestType.POLL.getType(),
new PollPayload(topicNames),
timeoutMs,
- session.getThriftMaxFrameSize()));
+ session.getThriftMaxFrameSize(),
+ progressByTopic));
}
List pollFile(
@@ -447,7 +543,7 @@ private static void verifyPipeSubscribeSuccess(final TSStatus status)
String.format(SUBSCRIPTION_PIPE_TIMEOUT_FORMATTER, status.code, status.message));
case 1900: // SUBSCRIPTION_VERSION_ERROR
case 1901: // SUBSCRIPTION_TYPE_ERROR
- case 1909: // SUBSCRIPTION_MISSING_CUSTOMER
+ case 1909: // SUBSCRIPTION_MISSING_CONSUMER
case 1912: // SUBSCRIPTION_NOT_ENABLED_ERROR
default:
{
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java
index 991857bc685ee..2607baebe2962 100644
--- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java
@@ -22,8 +22,11 @@
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import org.apache.iotdb.rpc.subscription.payload.poll.TopicProgress;
import org.apache.iotdb.session.subscription.consumer.AsyncCommitCallback;
+import org.apache.iotdb.session.subscription.payload.PollResult;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
import org.apache.iotdb.session.subscription.util.CollectionUtils;
import org.apache.iotdb.session.subscription.util.IdentifierUtils;
@@ -31,6 +34,7 @@
import org.slf4j.LoggerFactory;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -65,6 +69,8 @@ public abstract class AbstractSubscriptionPullConsumer extends AbstractSubscript
private final boolean autoCommit;
private final long autoCommitIntervalMs;
+ private final List processors = new ArrayList<>();
+
private SortedMap> uncommittedCommitContexts;
private final AtomicBoolean isClosed = new AtomicBoolean(true);
@@ -135,6 +141,24 @@ public synchronized void close() {
return;
}
+ // flush all processors and commit any remaining buffered messages
+ if (!processors.isEmpty()) {
+ final List flushed = new ArrayList<>();
+ for (final SubscriptionMessageProcessor processor : processors) {
+ final List out = processor.flush();
+ if (out != null) {
+ flushed.addAll(out);
+ }
+ }
+ if (!flushed.isEmpty() && autoCommit) {
+ try {
+ commitSync(flushed);
+ } catch (final SubscriptionException e) {
+ LOGGER.warn("Failed to commit flushed processor messages on close", e);
+ }
+ }
+ }
+
if (autoCommit) {
// commit all uncommitted messages
commitAllUncommittedMessages();
@@ -186,7 +210,7 @@ protected List poll(final Set topicNames, final lon
}
final List messages = multiplePoll(parsedTopicNames, timeoutMs);
- if (messages.isEmpty()) {
+ if (messages.isEmpty() && processors.isEmpty()) {
LOGGER.info(
"SubscriptionPullConsumer {} poll empty message from topics {} after {} millisecond(s)",
this,
@@ -195,6 +219,35 @@ protected List poll(final Set topicNames, final lon
return messages;
}
+ // Apply processor chain if configured
+ List processed = messages;
+ if (!processors.isEmpty()) {
+ for (final SubscriptionMessageProcessor processor : processors) {
+ processed = processor.process(processed);
+ }
+ }
+
+ // Update watermark timestamp before stripping watermark events
+ for (final SubscriptionMessage m : processed) {
+ if (m.getMessageType() == SubscriptionMessageType.WATERMARK.getType()) {
+ final long ts = m.getWatermarkTimestamp();
+ if (ts > latestWatermarkTimestamp) {
+ latestWatermarkTimestamp = ts;
+ }
+ }
+ }
+
+ // Strip system messages — they are only for processors, not for users
+ processed.removeIf(
+ m -> {
+ final short type = m.getMessageType();
+ return type == SubscriptionMessageType.WATERMARK.getType();
+ });
+
+ if (processed.isEmpty()) {
+ return processed;
+ }
+
// add to uncommitted messages
if (autoCommit) {
final long currentTimestamp = System.currentTimeMillis();
@@ -205,12 +258,56 @@ protected List poll(final Set topicNames, final lon
uncommittedCommitContexts
.computeIfAbsent(index, o -> new ConcurrentSkipListSet<>())
.addAll(
- messages.stream()
+ processed.stream()
.map(SubscriptionMessage::getCommitContext)
.collect(Collectors.toList()));
}
- return messages;
+ return processed;
+ }
+
+ /////////////////////////////// processor ///////////////////////////////
+
+ /**
+ * Adds a message processor to the pipeline. Processors are applied in order on each poll() call.
+ *
+ * @param processor the processor to add
+ */
+ protected AbstractSubscriptionPullConsumer addProcessor(
+ final SubscriptionMessageProcessor processor) {
+ processors.add(processor);
+ return this;
+ }
+
+ /**
+ * Polls with processor metadata. Returns a {@link PollResult} containing the messages, the total
+ * number of buffered messages across all processors, and the current watermark.
+ */
+ protected PollResult pollWithInfo(final long timeoutMs) throws SubscriptionException {
+ final List messages = poll(timeoutMs);
+ int totalBuffered = 0;
+ long watermark = -1;
+ for (final SubscriptionMessageProcessor processor : processors) {
+ totalBuffered += processor.getBufferedCount();
+ if (processor instanceof WatermarkProcessor) {
+ watermark = ((WatermarkProcessor) processor).getWatermark();
+ }
+ }
+ return new PollResult(messages, totalBuffered, watermark);
+ }
+
+ protected PollResult pollWithInfo(final Set topicNames, final long timeoutMs)
+ throws SubscriptionException {
+ final List messages = poll(topicNames, timeoutMs);
+ int totalBuffered = 0;
+ long watermark = -1;
+ for (final SubscriptionMessageProcessor processor : processors) {
+ totalBuffered += processor.getBufferedCount();
+ if (processor instanceof WatermarkProcessor) {
+ watermark = ((WatermarkProcessor) processor).getWatermark();
+ }
+ }
+ return new PollResult(messages, totalBuffered, watermark);
}
/////////////////////////////// commit ///////////////////////////////
@@ -242,6 +339,46 @@ protected void commitAsync(
super.commitAsync(messages, callback);
}
+ /////////////////////////////// seek ///////////////////////////////
+
+ /**
+ * Clears uncommitted auto-commit messages after seek to prevent stale acks from committing events
+ * that belonged to the pre-seek position.
+ */
+ @Override
+ public void seekToBeginning(final String topicName) throws SubscriptionException {
+ super.seekToBeginning(topicName);
+ if (autoCommit) {
+ uncommittedCommitContexts.clear();
+ }
+ }
+
+ @Override
+ public void seekToEnd(final String topicName) throws SubscriptionException {
+ super.seekToEnd(topicName);
+ if (autoCommit) {
+ uncommittedCommitContexts.clear();
+ }
+ }
+
+ @Override
+ public void seek(final String topicName, final TopicProgress topicProgress)
+ throws SubscriptionException {
+ super.seek(topicName, topicProgress);
+ if (autoCommit) {
+ uncommittedCommitContexts.clear();
+ }
+ }
+
+ @Override
+ public void seekAfter(final String topicName, final TopicProgress topicProgress)
+ throws SubscriptionException {
+ super.seekAfter(topicName, topicProgress);
+ if (autoCommit) {
+ uncommittedCommitContexts.clear();
+ }
+ }
+
/////////////////////////////// auto commit ///////////////////////////////
private void submitAutoCommitWorker() {
@@ -278,8 +415,19 @@ public void run() {
for (final Map.Entry> entry :
uncommittedCommitContexts.headMap(index).entrySet()) {
try {
- ackCommitContexts(entry.getValue());
- uncommittedCommitContexts.remove(entry.getKey());
+ final Set removableCommitContexts =
+ ackCommitContextsWithPartialProgress(entry.getValue());
+ if (removableCommitContexts.isEmpty()) {
+ continue;
+ }
+ if (removableCommitContexts.size() == entry.getValue().size()) {
+ uncommittedCommitContexts.remove(entry.getKey());
+ continue;
+ }
+ entry.getValue().removeAll(removableCommitContexts);
+ if (entry.getValue().isEmpty()) {
+ uncommittedCommitContexts.remove(entry.getKey());
+ }
} catch (final Exception e) {
LOGGER.warn("something unexpected happened when auto commit messages...", e);
}
@@ -291,8 +439,19 @@ private void commitAllUncommittedMessages() {
for (final Map.Entry> entry :
uncommittedCommitContexts.entrySet()) {
try {
- ackCommitContexts(entry.getValue());
- uncommittedCommitContexts.remove(entry.getKey());
+ final Set removableCommitContexts =
+ ackCommitContextsWithPartialProgress(entry.getValue());
+ if (removableCommitContexts.isEmpty()) {
+ continue;
+ }
+ if (removableCommitContexts.size() == entry.getValue().size()) {
+ uncommittedCommitContexts.remove(entry.getKey());
+ continue;
+ }
+ entry.getValue().removeAll(removableCommitContexts);
+ if (entry.getValue().isEmpty()) {
+ uncommittedCommitContexts.remove(entry.getKey());
+ }
} catch (final Exception e) {
LOGGER.warn("something unexpected happened when commit messages during close", e);
}
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumer.java
index 3ff93db218b27..1ac9f08696ddb 100644
--- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumer.java
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumer.java
@@ -26,6 +26,7 @@
import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
import org.apache.iotdb.session.subscription.util.CollectionUtils;
import org.slf4j.Logger;
@@ -180,6 +181,21 @@ public void run() {
try {
final List messages =
multiplePoll(subscribedTopics.keySet(), autoPollTimeoutMs);
+ // Update watermark timestamp before stripping watermark events
+ for (final SubscriptionMessage m : messages) {
+ if (m.getMessageType() == SubscriptionMessageType.WATERMARK.getType()) {
+ final long ts = m.getWatermarkTimestamp();
+ if (ts > latestWatermarkTimestamp) {
+ latestWatermarkTimestamp = ts;
+ }
+ }
+ }
+ // Strip system messages — push consumer does not use processors
+ messages.removeIf(
+ m -> {
+ final short type = m.getMessageType();
+ return type == SubscriptionMessageType.WATERMARK.getType();
+ });
if (messages.isEmpty()) {
LOGGER.info(
"SubscriptionPushConsumer {} poll empty message from topics {} after {} millisecond(s)",
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/ColumnAlignProcessor.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/ColumnAlignProcessor.java
new file mode 100644
index 0000000000000..13910a86c9abe
--- /dev/null
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/ColumnAlignProcessor.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription.consumer.base;
+
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
+
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.record.Tablet;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A non-buffering processor that forward-fills null columns in each Tablet using the last known
+ * value for the same device/table. This is useful for CDC scenarios where a write only updates a
+ * subset of columns, leaving others null; the processor fills them with the most recent value.
+ *
+ * State is maintained per device (identified by {@code Tablet.getDeviceId()} for tree-model or
+ * {@code Tablet.getTableName()} for table-model).
+ */
+public class ColumnAlignProcessor implements SubscriptionMessageProcessor {
+
+ // deviceKey -> (columnIndex -> lastValue)
+ private final Map> lastValues = new HashMap<>();
+
+ @Override
+ public List process(final List messages) {
+ for (final SubscriptionMessage message : messages) {
+ if (message.getMessageType() != SubscriptionMessageType.RECORD_HANDLER.getType()) {
+ continue;
+ }
+ final Iterator tablets = message.getRecordTabletIterator();
+ while (tablets.hasNext()) {
+ fillTablet(tablets.next());
+ }
+ }
+ return messages;
+ }
+
+ @Override
+ public List flush() {
+ return Collections.emptyList();
+ }
+
+ private void fillTablet(final Tablet tablet) {
+ final String deviceKey = getDeviceKey(tablet);
+ final Map cache = lastValues.computeIfAbsent(deviceKey, k -> new HashMap<>());
+
+ final Object[] values = tablet.getValues();
+ final BitMap[] bitMaps = tablet.getBitMaps();
+ final int rowSize = tablet.getRowSize();
+ final int columnCount = values.length;
+
+ for (int row = 0; row < rowSize; row++) {
+ for (int col = 0; col < columnCount; col++) {
+ final boolean isNull =
+ bitMaps != null && bitMaps[col] != null && bitMaps[col].isMarked(row);
+ if (isNull) {
+ // try forward-fill from cache
+ final Object cached = cache.get(col);
+ if (cached != null) {
+ setValueAt(values[col], row, cached);
+ bitMaps[col].unmark(row);
+ }
+ } else {
+ // update cache with this non-null value
+ cache.put(col, getValueAt(values[col], row));
+ }
+ }
+ }
+ }
+
+ private static String getDeviceKey(final Tablet tablet) {
+ // tree model uses deviceId; table model uses tableName
+ final String deviceId = tablet.getDeviceId();
+ return deviceId != null ? deviceId : tablet.getTableName();
+ }
+
+ private static Object getValueAt(final Object columnArray, final int row) {
+ if (columnArray instanceof long[]) {
+ return ((long[]) columnArray)[row];
+ } else if (columnArray instanceof int[]) {
+ return ((int[]) columnArray)[row];
+ } else if (columnArray instanceof double[]) {
+ return ((double[]) columnArray)[row];
+ } else if (columnArray instanceof float[]) {
+ return ((float[]) columnArray)[row];
+ } else if (columnArray instanceof boolean[]) {
+ return ((boolean[]) columnArray)[row];
+ } else if (columnArray instanceof Object[]) {
+ return ((Object[]) columnArray)[row];
+ }
+ return null;
+ }
+
+ private static void setValueAt(final Object columnArray, final int row, final Object value) {
+ if (columnArray instanceof long[]) {
+ ((long[]) columnArray)[row] = (Long) value;
+ } else if (columnArray instanceof int[]) {
+ ((int[]) columnArray)[row] = (Integer) value;
+ } else if (columnArray instanceof double[]) {
+ ((double[]) columnArray)[row] = (Double) value;
+ } else if (columnArray instanceof float[]) {
+ ((float[]) columnArray)[row] = (Float) value;
+ } else if (columnArray instanceof boolean[]) {
+ ((boolean[]) columnArray)[row] = (Boolean) value;
+ } else if (columnArray instanceof Object[]) {
+ ((Object[]) columnArray)[row] = value;
+ }
+ }
+}
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/SubscriptionMessageProcessor.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/SubscriptionMessageProcessor.java
new file mode 100644
index 0000000000000..ceee674cd6901
--- /dev/null
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/SubscriptionMessageProcessor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription.consumer.base;
+
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+
+import java.util.List;
+
+/**
+ * A processor that transforms, filters, or enriches subscription messages in the pull consumer
+ * pipeline. Processors are chained and invoked on each poll() call.
+ *
+ * Processors may buffer messages internally (e.g., for watermark-based ordering) and return them
+ * in later process() calls. Buffered messages should be released via {@link #flush()} when the
+ * consumer closes.
+ */
+public interface SubscriptionMessageProcessor {
+
+ /**
+ * Process a batch of messages. May return fewer, more, or different messages than the input.
+ *
+ * @param messages the messages from the previous stage (or raw poll)
+ * @return messages to pass to the next stage (or to the user)
+ */
+ List process(List messages);
+
+ /**
+ * Flush all internally buffered messages. Called when the consumer is closing.
+ *
+ * @return any remaining buffered messages
+ */
+ List flush();
+
+ /** Returns the number of messages currently buffered by this processor. */
+ default int getBufferedCount() {
+ return 0;
+ }
+}
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/WatermarkProcessor.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/WatermarkProcessor.java
new file mode 100644
index 0000000000000..8c17896ce5de5
--- /dev/null
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/WatermarkProcessor.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription.consumer.base;
+
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
+
+import org.apache.tsfile.write.record.Tablet;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * A buffering processor that reorders messages based on watermark semantics. Messages are buffered
+ * internally and emitted only when the watermark advances past their maximum timestamp.
+ *
+ * Watermark = (minimum of latest timestamp per active source) - maxOutOfOrdernessMs
+ *
+ *
A source is considered "stale" if its latest timestamp has not increased for {@code
+ * staleSourceTimeoutMs}. Stale sources are excluded from the watermark calculation, preventing a
+ * single slow or idle source from anchoring the global watermark indefinitely.
+ *
+ *
Server-side WATERMARK events (carrying per-region timestamp progress) serve as heartbeats,
+ * confirming source liveness. They advance the per-source timestamp only when their timestamp is
+ * higher than the previously observed value.
+ *
+ *
A timeout mechanism ensures that buffered messages are eventually flushed even if no new data
+ * arrives, preventing unbounded buffering.
+ *
+ *
Note: This processor is primarily intended as a reference implementation. For
+ * production use with large-scale out-of-order data, consider using a downstream stream processing
+ * framework (Flink, Spark) for watermark handling.
+ */
+public class WatermarkProcessor implements SubscriptionMessageProcessor {
+
+ private static final long DEFAULT_STALE_SOURCE_TIMEOUT_MS = 30_000L;
+ private static final long DEFAULT_MAX_BUFFER_BYTES = 64L * 1024 * 1024; // 64 MB
+
+ private final long maxOutOfOrdernessMs;
+ private final long timeoutMs;
+ private final long staleSourceTimeoutMs;
+ private final long maxBufferBytes;
+
+ // Buffer ordered by message max timestamp
+ private final PriorityQueue buffer =
+ new PriorityQueue<>((a, b) -> Long.compare(a.maxTimestamp, b.maxTimestamp));
+
+ // Track latest timestamp per source (deviceId/tableName)
+ private final java.util.Map latestPerSource = new java.util.HashMap<>();
+ // Track wall-clock time when each source's timestamp last increased
+ private final java.util.Map lastAdvancedTimeMs = new java.util.HashMap<>();
+ private long lastEmitTimeMs = System.currentTimeMillis();
+ private long bufferedBytes = 0;
+
+ // Current watermark value
+ private long watermark = Long.MIN_VALUE;
+
+ /**
+ * Creates a WatermarkProcessor with default stale source timeout (30 seconds).
+ *
+ * @param maxOutOfOrdernessMs maximum expected out-of-orderness in milliseconds
+ * @param timeoutMs if no data arrives within this duration, force-flush all buffered messages
+ */
+ public WatermarkProcessor(final long maxOutOfOrdernessMs, final long timeoutMs) {
+ this(maxOutOfOrdernessMs, timeoutMs, DEFAULT_STALE_SOURCE_TIMEOUT_MS, DEFAULT_MAX_BUFFER_BYTES);
+ }
+
+ /**
+ * Creates a WatermarkProcessor.
+ *
+ * @param maxOutOfOrdernessMs maximum expected out-of-orderness in milliseconds
+ * @param timeoutMs if no data arrives within this duration, force-flush all buffered messages
+ * @param staleSourceTimeoutMs if a source's timestamp has not increased for this duration, it is
+ * excluded from watermark calculation. Use {@link Long#MAX_VALUE} to disable.
+ * @param maxBufferBytes maximum total estimated bytes of buffered messages. When exceeded, all
+ * buffered messages are force-flushed regardless of watermark. Defaults to 64 MB.
+ */
+ public WatermarkProcessor(
+ final long maxOutOfOrdernessMs,
+ final long timeoutMs,
+ final long staleSourceTimeoutMs,
+ final long maxBufferBytes) {
+ this.maxOutOfOrdernessMs = maxOutOfOrdernessMs;
+ this.timeoutMs = timeoutMs;
+ this.staleSourceTimeoutMs = staleSourceTimeoutMs;
+ this.maxBufferBytes = maxBufferBytes;
+ }
+
+ @Override
+ public List process(final List messages) {
+ final long now = System.currentTimeMillis();
+
+ // Buffer incoming messages and update per-source timestamps
+ for (final SubscriptionMessage message : messages) {
+ // WATERMARK events carry server-side timestamp progress per region.
+ // They serve as heartbeats and advance per-source tracking only when the timestamp
+ // actually increases.
+ if (message.getMessageType() == SubscriptionMessageType.WATERMARK.getType()) {
+ final String regionKey =
+ "region-"
+ + message.getCommitContext().getDataNodeId()
+ + "-"
+ + message.getCommitContext().getRegionId();
+ advanceSourceTimestamp(regionKey, message.getWatermarkTimestamp(), now);
+ continue; // Do not buffer system events
+ }
+
+ final long maxTs = extractMaxTimestamp(message);
+ final long estimatedSize = message.estimateSize();
+ buffer.add(new TimestampedMessage(message, maxTs, estimatedSize));
+ bufferedBytes += estimatedSize;
+ updateSourceTimestamp(message, maxTs, now);
+ }
+
+ // Compute watermark = min(latest per active source) - maxOutOfOrderness
+ // Sources whose timestamp has not increased for staleSourceTimeoutMs are excluded.
+ if (!latestPerSource.isEmpty()) {
+ long minLatest = Long.MAX_VALUE;
+ for (final java.util.Map.Entry entry : latestPerSource.entrySet()) {
+ final Long lastAdv = lastAdvancedTimeMs.get(entry.getKey());
+ if (lastAdv != null && (now - lastAdv) <= staleSourceTimeoutMs) {
+ minLatest = Math.min(minLatest, entry.getValue());
+ }
+ }
+ if (minLatest != Long.MAX_VALUE) {
+ watermark = minLatest - maxOutOfOrdernessMs;
+ }
+ // If all sources are stale, watermark stays unchanged and timeout will handle it.
+ }
+
+ // Emit messages whose maxTimestamp <= watermark
+ final List emitted = emit(watermark);
+
+ // Buffer overflow: force-flush all if buffer exceeds byte limit
+ if (bufferedBytes > maxBufferBytes) {
+ return forceFlushAll();
+ }
+
+ // Timeout: if nothing was emitted and timeout exceeded, force-flush all
+ if (emitted.isEmpty() && (now - lastEmitTimeMs) >= timeoutMs && !buffer.isEmpty()) {
+ return forceFlushAll();
+ }
+
+ if (!emitted.isEmpty()) {
+ lastEmitTimeMs = now;
+ }
+ return emitted;
+ }
+
+ @Override
+ public List flush() {
+ return forceFlushAll();
+ }
+
+ @Override
+ public int getBufferedCount() {
+ return buffer.size();
+ }
+
+ /** Returns the current watermark value. */
+ public long getWatermark() {
+ return watermark;
+ }
+
+ private List emit(final long watermarkValue) {
+ final List result = new ArrayList<>();
+ while (!buffer.isEmpty() && buffer.peek().maxTimestamp <= watermarkValue) {
+ final TimestampedMessage tm = buffer.poll();
+ bufferedBytes -= tm.estimatedSize;
+ result.add(tm.message);
+ }
+ return result;
+ }
+
+ private List forceFlushAll() {
+ final List result = new ArrayList<>(buffer.size());
+ while (!buffer.isEmpty()) {
+ result.add(buffer.poll().message);
+ }
+ bufferedBytes = 0;
+ lastEmitTimeMs = System.currentTimeMillis();
+ return result;
+ }
+
+ private static long extractMaxTimestamp(final SubscriptionMessage message) {
+ long maxTs = Long.MIN_VALUE;
+ if (message.getMessageType() == SubscriptionMessageType.RECORD_HANDLER.getType()) {
+ final Iterator it = message.getRecordTabletIterator();
+ while (it.hasNext()) {
+ final Tablet tablet = it.next();
+ final long[] timestamps = tablet.getTimestamps();
+ final int rowSize = tablet.getRowSize();
+ for (int i = 0; i < rowSize; i++) {
+ maxTs = Math.max(maxTs, timestamps[i]);
+ }
+ }
+ }
+ // For non-tablet messages or empty messages, use current wall clock
+ if (maxTs == Long.MIN_VALUE) {
+ maxTs = System.currentTimeMillis();
+ }
+ return maxTs;
+ }
+
+ private void updateSourceTimestamp(
+ final SubscriptionMessage message, final long maxTs, final long nowMs) {
+ // Use region-based key so data events and WATERMARK events share the same key namespace.
+ final String regionId = message.getCommitContext().getRegionId();
+ final int dataNodeId = message.getCommitContext().getDataNodeId();
+ final String key = "region-" + dataNodeId + "-" + regionId;
+ advanceSourceTimestamp(key, maxTs, nowMs);
+ }
+
+ /**
+ * Updates the per-source timestamp tracking. Only records a new "last advanced" wall-clock time
+ * when the timestamp actually increases, so that stale sources (whose timestamps don't advance)
+ * are eventually excluded from watermark calculation.
+ */
+ private void advanceSourceTimestamp(final String key, final long newTs, final long nowMs) {
+ final Long oldTs = latestPerSource.get(key);
+ if (oldTs == null || newTs > oldTs) {
+ latestPerSource.put(key, newTs);
+ lastAdvancedTimeMs.put(key, nowMs);
+ }
+ }
+
+ private static final class TimestampedMessage {
+ final SubscriptionMessage message;
+ final long maxTimestamp;
+ final long estimatedSize;
+
+ TimestampedMessage(
+ final SubscriptionMessage message, final long maxTimestamp, final long estimatedSize) {
+ this.message = message;
+ this.maxTimestamp = maxTimestamp;
+ this.estimatedSize = estimatedSize;
+ }
+ }
+}
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
index 83dd39aebbf7d..e3fb90cda470a 100644
--- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java
@@ -25,6 +25,8 @@
import org.apache.iotdb.session.subscription.consumer.ISubscriptionTablePullConsumer;
import org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionProvider;
import org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.consumer.base.SubscriptionMessageProcessor;
+import org.apache.iotdb.session.subscription.payload.PollResult;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import java.time.Duration;
@@ -173,4 +175,24 @@ public String getConsumerGroupId() {
public boolean allTopicMessagesHaveBeenConsumed() {
return super.allTopicMessagesHaveBeenConsumed();
}
+
+ /////////////////////////////// processor ///////////////////////////////
+
+ public SubscriptionTablePullConsumer addProcessor(final SubscriptionMessageProcessor processor) {
+ super.addProcessor(processor);
+ return this;
+ }
+
+ public PollResult pollWithInfo(final long timeoutMs) throws SubscriptionException {
+ return super.pollWithInfo(timeoutMs);
+ }
+
+ public PollResult pollWithInfo(final Duration timeout) throws SubscriptionException {
+ return super.pollWithInfo(timeout.toMillis());
+ }
+
+ public PollResult pollWithInfo(final Set topicNames, final long timeoutMs)
+ throws SubscriptionException {
+ return super.pollWithInfo(topicNames, timeoutMs);
+ }
}
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
index 23050893f660d..c4daab68839aa 100644
--- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java
@@ -27,6 +27,8 @@
import org.apache.iotdb.session.subscription.consumer.ISubscriptionTreePullConsumer;
import org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionProvider;
import org.apache.iotdb.session.subscription.consumer.base.AbstractSubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.consumer.base.SubscriptionMessageProcessor;
+import org.apache.iotdb.session.subscription.payload.PollResult;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.util.IdentifierUtils;
@@ -220,6 +222,26 @@ public boolean allTopicMessagesHaveBeenConsumed() {
return super.allTopicMessagesHaveBeenConsumed();
}
+ /////////////////////////////// processor ///////////////////////////////
+
+ public SubscriptionTreePullConsumer addProcessor(final SubscriptionMessageProcessor processor) {
+ super.addProcessor(processor);
+ return this;
+ }
+
+ public PollResult pollWithInfo(final long timeoutMs) throws SubscriptionException {
+ return super.pollWithInfo(timeoutMs);
+ }
+
+ public PollResult pollWithInfo(final Duration timeout) throws SubscriptionException {
+ return super.pollWithInfo(timeout.toMillis());
+ }
+
+ public PollResult pollWithInfo(final Set topicNames, final long timeoutMs)
+ throws SubscriptionException {
+ return super.pollWithInfo(topicNames, timeoutMs);
+ }
+
/////////////////////////////// builder ///////////////////////////////
@Deprecated // keep for forward compatibility
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/PollResult.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/PollResult.java
new file mode 100644
index 0000000000000..be56548116e11
--- /dev/null
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/PollResult.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription.payload;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Result of a poll operation that includes processor metadata alongside the messages. */
+public class PollResult {
+
+ private final List messages;
+ private final int bufferedCount;
+ private final long watermark;
+
+ public PollResult(
+ final List messages, final int bufferedCount, final long watermark) {
+ this.messages = messages != null ? messages : Collections.emptyList();
+ this.bufferedCount = bufferedCount;
+ this.watermark = watermark;
+ }
+
+ /** Returns the processed messages ready for consumption. */
+ public List getMessages() {
+ return messages;
+ }
+
+ /** Returns the total number of messages currently buffered across all processors. */
+ public int getBufferedCount() {
+ return bufferedCount;
+ }
+
+ /**
+ * Returns the current watermark timestamp (-1 if no watermark processor is configured). Messages
+ * with timestamps at or before this value have all been emitted.
+ */
+ public long getWatermark() {
+ return watermark;
+ }
+
+ @Override
+ public String toString() {
+ return "PollResult{messages="
+ + messages.size()
+ + ", bufferedCount="
+ + bufferedCount
+ + ", watermark="
+ + watermark
+ + "}";
+ }
+}
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessage.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessage.java
index b4ea6f0166f1b..a2a7c9df51c8e 100644
--- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessage.java
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessage.java
@@ -40,6 +40,9 @@ public class SubscriptionMessage implements Comparable {
private final SubscriptionMessageHandler handler;
+ /** Watermark timestamp, valid only when messageType == WATERMARK. */
+ private final long watermarkTimestamp;
+
private volatile boolean userDataRemoved = false;
public SubscriptionMessage(
@@ -47,6 +50,7 @@ public SubscriptionMessage(
this.commitContext = commitContext;
this.messageType = SubscriptionMessageType.RECORD_HANDLER.getType();
this.handler = new SubscriptionRecordHandler(tablets);
+ this.watermarkTimestamp = Long.MIN_VALUE;
}
public SubscriptionMessage(
@@ -56,6 +60,16 @@ public SubscriptionMessage(
this.commitContext = commitContext;
this.messageType = SubscriptionMessageType.TS_FILE.getType();
this.handler = new SubscriptionTsFileHandler(absolutePath, databaseName);
+ this.watermarkTimestamp = Long.MIN_VALUE;
+ }
+
+ /** Watermark message carrying server-side timestamp progress for a region. */
+ public SubscriptionMessage(
+ final SubscriptionCommitContext commitContext, final long watermarkTimestamp) {
+ this.commitContext = commitContext;
+ this.messageType = SubscriptionMessageType.WATERMARK.getType();
+ this.handler = null;
+ this.watermarkTimestamp = watermarkTimestamp;
}
public SubscriptionCommitContext getCommitContext() {
@@ -66,12 +80,42 @@ public short getMessageType() {
return messageType;
}
+ /**
+ * Returns the watermark timestamp carried by this message. Only valid when {@code
+ * getMessageType() == SubscriptionMessageType.WATERMARK.getType()}.
+ *
+ * @return the watermark timestamp, or {@code Long.MIN_VALUE} if not a watermark message
+ */
+ public long getWatermarkTimestamp() {
+ return watermarkTimestamp;
+ }
+
+ /**
+ * Estimates the heap memory occupied by this message in bytes. For tablet-based messages, this
+ * delegates to {@link Tablet#ramBytesUsed()} for accurate per-column estimation.
+ *
+ * @return estimated byte size
+ */
+ public long estimateSize() {
+ // Object header + references + primitives (rough constant)
+ long size = 64;
+ if (handler instanceof SubscriptionRecordHandler) {
+ final Iterator it = getRecordTabletIterator();
+ while (it.hasNext()) {
+ size += it.next().ramBytesUsed();
+ }
+ }
+ return size;
+ }
+
public void removeUserData() {
if (userDataRemoved) {
return;
}
- handler.removeUserData();
+ if (Objects.nonNull(handler)) {
+ handler.removeUserData();
+ }
if (handler instanceof SubscriptionRecordHandler) {
userDataRemoved = true;
}
@@ -89,13 +133,14 @@ public boolean equals(final Object obj) {
}
final SubscriptionMessage that = (SubscriptionMessage) obj;
return Objects.equals(this.commitContext, that.commitContext)
+ && this.watermarkTimestamp == that.watermarkTimestamp
&& Objects.equals(this.messageType, that.messageType)
&& Objects.equals(this.handler, that.handler);
}
@Override
public int hashCode() {
- return Objects.hash(commitContext, messageType, handler);
+ return Objects.hash(commitContext, messageType, handler, watermarkTimestamp);
}
@Override
@@ -109,6 +154,8 @@ public String toString() {
+ commitContext
+ ", messageType="
+ SubscriptionMessageType.valueOf(messageType).toString()
+ + ", watermarkTimestamp="
+ + watermarkTimestamp
+ "}";
}
diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessageType.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessageType.java
index 34189c2fa9b42..0732c0590c181 100644
--- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessageType.java
+++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessageType.java
@@ -26,6 +26,7 @@
public enum SubscriptionMessageType {
RECORD_HANDLER((short) 0),
TS_FILE((short) 1),
+ WATERMARK((short) 3),
;
private final short type;
diff --git a/iotdb-client/subscription/src/test/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionCommitContextTest.java b/iotdb-client/subscription/src/test/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionCommitContextTest.java
new file mode 100644
index 0000000000000..4c70d25bfd68d
--- /dev/null
+++ b/iotdb-client/subscription/src/test/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionCommitContextTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.poll;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SubscriptionCommitContextTest {
+
+ @Test
+ public void testDeserializeCurrentCommitIdContext() throws IOException {
+ final SubscriptionCommitContext original =
+ new SubscriptionCommitContext(1, 2, "topic", "group", 3L);
+ final ByteBuffer buffer = SubscriptionCommitContext.serialize(original);
+
+ final SubscriptionCommitContext context = SubscriptionCommitContext.deserialize(buffer);
+
+ assertEquals(1, context.getDataNodeId());
+ assertEquals(2, context.getRebootTimes());
+ assertEquals("topic", context.getTopicName());
+ assertEquals("group", context.getConsumerGroupId());
+ assertEquals(3L, context.getCommitId());
+ assertEquals(0L, context.getSeekGeneration());
+ assertEquals("", context.getRegionId());
+ assertEquals(0L, context.getPhysicalTime());
+ assertFalse(context.hasWriterProgress());
+ assertTrue(context.isCommittable());
+ }
+
+ @Test
+ public void testDeserializeCurrentPhysicalTimeContext() throws IOException {
+ final SubscriptionCommitContext original =
+ new SubscriptionCommitContext(1, 2, "topic", "group", 3L, 4L, "region", 5L);
+
+ final ByteBuffer buffer = SubscriptionCommitContext.serialize(original);
+ final SubscriptionCommitContext parsed = SubscriptionCommitContext.deserialize(buffer);
+
+ assertEquals(original, parsed);
+ assertFalse(parsed.hasWriterProgress());
+ assertTrue(parsed.isCommittable());
+ }
+
+ @Test
+ public void testDeserializeV2() throws IOException {
+ final WriterId writerId = new WriterId("region", 7, 8L);
+ final WriterProgress writerProgress = new WriterProgress(9L, 10L);
+ final SubscriptionCommitContext original =
+ new SubscriptionCommitContext(1, 2, "topic", "group", 3L, writerId, writerProgress);
+
+ final ByteBuffer buffer = SubscriptionCommitContext.serialize(original);
+ final SubscriptionCommitContext parsed = SubscriptionCommitContext.deserialize(buffer);
+
+ assertEquals(original, parsed);
+ assertEquals(writerId, parsed.getWriterId());
+ assertEquals(writerProgress, parsed.getWriterProgress());
+ assertEquals("region", parsed.getRegionId());
+ assertEquals(9L, parsed.getPhysicalTime());
+ assertEquals(10L, parsed.getLocalSeq());
+ assertTrue(parsed.hasWriterProgress());
+ assertTrue(parsed.isCommittable());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDeserializeUnsupportedVersion() throws IOException {
+ final ByteBuffer buffer = buildCurrentBufferWithVersion((byte) 1, 1, 2, "topic", "group", 3L);
+ SubscriptionCommitContext.deserialize(buffer);
+ }
+
+ private static ByteBuffer buildCurrentBuffer(
+ final int dataNodeId,
+ final int rebootTimes,
+ final String topicName,
+ final String consumerGroupId,
+ final long commitId)
+ throws IOException {
+ return buildCurrentBufferWithVersion(
+ (byte) 2, dataNodeId, rebootTimes, topicName, consumerGroupId, commitId);
+ }
+
+ private static ByteBuffer buildCurrentBufferWithVersion(
+ final byte version,
+ final int dataNodeId,
+ final int rebootTimes,
+ final String topicName,
+ final String consumerGroupId,
+ final long commitId)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(version, outputStream);
+ ReadWriteIOUtils.write(dataNodeId, outputStream);
+ ReadWriteIOUtils.write(rebootTimes, outputStream);
+ ReadWriteIOUtils.write(topicName, outputStream);
+ ReadWriteIOUtils.write(consumerGroupId, outputStream);
+ ReadWriteIOUtils.write(commitId, outputStream);
+ ReadWriteIOUtils.write(0L, outputStream);
+ ReadWriteIOUtils.write("", outputStream);
+ ReadWriteIOUtils.write(0L, outputStream);
+ ReadWriteIOUtils.write((byte) 0, outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+ }
+}
diff --git a/iotdb-client/subscription/src/test/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollRequestTest.java b/iotdb-client/subscription/src/test/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollRequestTest.java
new file mode 100644
index 0000000000000..ecfea3d160bc4
--- /dev/null
+++ b/iotdb-client/subscription/src/test/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollRequestTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.poll;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class SubscriptionPollRequestTest {
+
+ @Test
+ public void testRoundTripWithProgressByTopic() throws IOException {
+ final Map writerPositions = new LinkedHashMap<>();
+ writerPositions.put(new WriterId("1_100", 7, 2L), new WriterProgress(1001L, 11L));
+ writerPositions.put(new WriterId("1_100", 8, 1L), new WriterProgress(999L, 9L));
+
+ final TopicProgress topicProgress =
+ new TopicProgress(Collections.singletonMap("1_100", new RegionProgress(writerPositions)));
+ final Map progressByTopic = new LinkedHashMap<>();
+ progressByTopic.put("topicA", topicProgress);
+
+ final SubscriptionPollRequest original =
+ new SubscriptionPollRequest(
+ SubscriptionPollRequestType.POLL.getType(),
+ new PollPayload(Collections.singleton("topicA")),
+ 1234L,
+ 4096L,
+ progressByTopic);
+
+ final ByteBuffer serialized = SubscriptionPollRequest.serialize(original);
+ final SubscriptionPollRequest parsed = SubscriptionPollRequest.deserialize(serialized);
+
+ assertEquals(original.getRequestType(), parsed.getRequestType());
+ assertEquals(original.getTimeoutMs(), parsed.getTimeoutMs());
+ assertEquals(original.getMaxBytes(), parsed.getMaxBytes());
+ assertEquals(original.getPayload(), parsed.getPayload());
+ assertEquals(progressByTopic, parsed.getProgressByTopic());
+ }
+}
diff --git a/iotdb-client/subscription/src/test/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeSeekReqTest.java b/iotdb-client/subscription/src/test/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeSeekReqTest.java
new file mode 100644
index 0000000000000..c2afb43110289
--- /dev/null
+++ b/iotdb-client/subscription/src/test/java/org/apache/iotdb/rpc/subscription/payload/request/PipeSubscribeSeekReqTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.request;
+
+import org.apache.iotdb.rpc.subscription.payload.poll.RegionProgress;
+import org.apache.iotdb.rpc.subscription.payload.poll.TopicProgress;
+import org.apache.iotdb.rpc.subscription.payload.poll.WriterId;
+import org.apache.iotdb.rpc.subscription.payload.poll.WriterProgress;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class PipeSubscribeSeekReqTest {
+
+ @Test
+ public void testTopicProgressSeekRoundTrip() throws IOException {
+ final Map writerPositions = new LinkedHashMap<>();
+ writerPositions.put(new WriterId("1_100", 1, 2L), new WriterProgress(1000L, 10L));
+ final TopicProgress original =
+ new TopicProgress(Collections.singletonMap("1_100", new RegionProgress(writerPositions)));
+
+ final PipeSubscribeSeekReq req =
+ PipeSubscribeSeekReq.toTPipeSubscribeSeekAfterReq("topicA", original);
+ final PipeSubscribeSeekReq parsed = PipeSubscribeSeekReq.fromTPipeSubscribeReq(req);
+
+ assertEquals(PipeSubscribeSeekReq.SEEK_AFTER_TOPIC_PROGRESS, parsed.getSeekType());
+ assertEquals("topicA", parsed.getTopicName());
+ assertEquals(original, parsed.getTopicProgress());
+ }
+}
diff --git a/iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/base/WatermarkProcessorTest.java b/iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/base/WatermarkProcessorTest.java
new file mode 100644
index 0000000000000..613090650bd1a
--- /dev/null
+++ b/iotdb-client/subscription/src/test/java/org/apache/iotdb/session/subscription/consumer/base/WatermarkProcessorTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription.consumer.base;
+
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class WatermarkProcessorTest {
+
+ private static final String TOPIC = "topic1";
+ private static final String GROUP = "group1";
+ private static final String REGION_R1 = "R1";
+ private static final String REGION_R2 = "R2";
+
+ private static SubscriptionMessage dataMsg(final String regionId, final int dataNodeId) {
+ final SubscriptionCommitContext ctx =
+ new SubscriptionCommitContext(dataNodeId, 0, TOPIC, GROUP, 0L, 0L, regionId, 0L);
+ return new SubscriptionMessage(ctx, Collections.emptyMap());
+ }
+
+ private static SubscriptionMessage watermarkMsg(
+ final String regionId, final int dataNodeId, final long watermarkTs) {
+ final SubscriptionCommitContext ctx =
+ new SubscriptionCommitContext(dataNodeId, 0, TOPIC, GROUP, 0L, 0L, regionId, 0L);
+ return new SubscriptionMessage(ctx, watermarkTs);
+ }
+
+ @Test
+ public void testSingleRegionRelease() {
+ final WatermarkProcessor proc = new WatermarkProcessor(5, 60_000);
+
+ final List result =
+ proc.process(Collections.singletonList(watermarkMsg(REGION_R1, 1, 1000)));
+
+ Assert.assertTrue(result.isEmpty());
+ Assert.assertEquals(995, proc.getWatermark());
+ }
+
+ @Test
+ public void testTwoRegionsMinWatermark() {
+ final WatermarkProcessor proc = new WatermarkProcessor(10, 60_000);
+
+ proc.process(Arrays.asList(watermarkMsg(REGION_R1, 1, 2000), watermarkMsg(REGION_R2, 1, 500)));
+
+ Assert.assertEquals(490, proc.getWatermark());
+ }
+
+ @Test
+ public void testWatermarkAdvancesIdleRegion() {
+ final WatermarkProcessor proc = new WatermarkProcessor(5, 60_000);
+
+ proc.process(Arrays.asList(watermarkMsg(REGION_R1, 1, 2000), watermarkMsg(REGION_R2, 1, 500)));
+ Assert.assertEquals(495, proc.getWatermark());
+
+ proc.process(Collections.singletonList(watermarkMsg(REGION_R2, 1, 1500)));
+ Assert.assertEquals(1495, proc.getWatermark());
+
+ proc.process(Collections.singletonList(watermarkMsg(REGION_R2, 1, 3000)));
+ Assert.assertEquals(1995, proc.getWatermark());
+ }
+
+ @Test
+ public void testWatermarkEventsNotBuffered() {
+ final WatermarkProcessor proc = new WatermarkProcessor(5, 60_000);
+
+ proc.process(Collections.singletonList(watermarkMsg(REGION_R1, 1, 1000)));
+
+ Assert.assertEquals(0, proc.getBufferedCount());
+ }
+
+ @Test
+ public void testFlushReleasesAll() {
+ final WatermarkProcessor proc = new WatermarkProcessor(5, 60_000);
+
+ proc.process(Arrays.asList(dataMsg(REGION_R1, 1), dataMsg(REGION_R1, 1)));
+
+ proc.flush();
+ Assert.assertEquals(0, proc.getBufferedCount());
+ }
+
+ @Test
+ public void testWatermarkNoRegression() {
+ final WatermarkProcessor proc = new WatermarkProcessor(10, 60_000);
+
+ proc.process(Collections.singletonList(watermarkMsg(REGION_R1, 1, 2000)));
+ Assert.assertEquals(1990, proc.getWatermark());
+
+ proc.process(Collections.singletonList(watermarkMsg(REGION_R1, 1, 1500)));
+ Assert.assertEquals(1990, proc.getWatermark());
+ }
+
+ @Test
+ public void testMultipleWatermarksInSingleBatch() {
+ final WatermarkProcessor proc = new WatermarkProcessor(0, 60_000);
+
+ proc.process(
+ Arrays.asList(
+ watermarkMsg(REGION_R1, 1, 100),
+ watermarkMsg(REGION_R2, 1, 200),
+ watermarkMsg(REGION_R1, 1, 300)));
+
+ Assert.assertEquals(200, proc.getWatermark());
+ }
+
+ @Test
+ public void testEmptyInput() {
+ final WatermarkProcessor proc = new WatermarkProcessor(5, 60_000);
+
+ final List result = proc.process(Collections.emptyList());
+ Assert.assertTrue(result.isEmpty());
+ Assert.assertEquals(Long.MIN_VALUE, proc.getWatermark());
+ }
+
+ @Test
+ public void testThreeRegionsSlowestDeterminesWatermark() {
+ final WatermarkProcessor proc = new WatermarkProcessor(10, 60_000);
+
+ proc.process(
+ Arrays.asList(
+ watermarkMsg(REGION_R1, 1, 5000),
+ watermarkMsg(REGION_R2, 1, 3000),
+ watermarkMsg("R3", 2, 4000)));
+
+ Assert.assertEquals(2990, proc.getWatermark());
+
+ proc.process(Collections.singletonList(watermarkMsg(REGION_R2, 1, 6000)));
+ Assert.assertEquals(3990, proc.getWatermark());
+ }
+
+ @Test
+ public void testZeroOutOfOrderness() {
+ final WatermarkProcessor proc = new WatermarkProcessor(0, 60_000);
+
+ proc.process(Collections.singletonList(watermarkMsg(REGION_R1, 1, 1000)));
+ Assert.assertEquals(1000, proc.getWatermark());
+ }
+}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
index e5753bf1bd184..7f20f8cbfd03a 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
@@ -79,6 +79,8 @@ public enum CnToDnAsyncRequestType {
TOPIC_PUSH_MULTI_META,
CONSUMER_GROUP_PUSH_ALL_META,
CONSUMER_GROUP_PUSH_SINGLE_META,
+ PULL_COMMIT_PROGRESS,
+ SUBSCRIPTION_PUSH_RUNTIME,
// TEMPLATE
UPDATE_TEMPLATE,
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index cd69f8b2c846d..4faea49d2fb7f 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -47,6 +47,7 @@
import org.apache.iotdb.confignode.client.async.handlers.rpc.TreeDeviceViewFieldDetectionHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.PullCommitProgressRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterEncodingCompressorReq;
@@ -83,6 +84,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TKillQueryInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.TPullCommitProgressReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushMultiTopicMetaReq;
@@ -90,6 +92,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TPushSingleConsumerGroupMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushSinglePipeMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushSingleTopicMetaReq;
+import org.apache.iotdb.mpp.rpc.thrift.TPushSubscriptionRuntimeReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
@@ -224,6 +227,16 @@ protected void initActionMapBuilder() {
(req, client, handler) ->
client.pushSingleConsumerGroupMeta(
(TPushSingleConsumerGroupMetaReq) req, (ConsumerGroupPushMetaRPCHandler) handler));
+ actionMapBuilder.put(
+ CnToDnAsyncRequestType.PULL_COMMIT_PROGRESS,
+ (req, client, handler) ->
+ client.pullCommitProgress(
+ (TPullCommitProgressReq) req, (PullCommitProgressRPCHandler) handler));
+ actionMapBuilder.put(
+ CnToDnAsyncRequestType.SUBSCRIPTION_PUSH_RUNTIME,
+ (req, client, handler) ->
+ client.pushSubscriptionRuntime(
+ (TPushSubscriptionRuntimeReq) req, (DataNodeTSStatusRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.PIPE_HEARTBEAT,
(req, client, handler) ->
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
index b2e2ec3232781..084998aa04825 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
@@ -29,12 +29,14 @@
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.PullCommitProgressRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TDeviceViewResp;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
+import org.apache.iotdb.mpp.rpc.thrift.TPullCommitProgressResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
@@ -169,6 +171,14 @@ public static DataNodeAsyncRequestRPCHandler> buildHandler(
dataNodeLocationMap,
(Map) responseMap,
countDownLatch);
+ case PULL_COMMIT_PROGRESS:
+ return new PullCommitProgressRPCHandler(
+ requestType,
+ requestId,
+ targetDataNode,
+ dataNodeLocationMap,
+ (Map) responseMap,
+ countDownLatch);
case CHANGE_REGION_LEADER:
return new TransferLeaderRPCHandler(
requestType,
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeTSStatusRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeTSStatusRPCHandler.java
index 7c93f363dd4b8..bd8042071480a 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeTSStatusRPCHandler.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeTSStatusRPCHandler.java
@@ -48,22 +48,19 @@ public DataNodeTSStatusRPCHandler(
@Override
public void onComplete(TSStatus response) {
- // Put response
responseMap.put(requestId, response);
if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- // Remove only if success
nodeLocationMap.remove(requestId);
LOGGER.info("Successfully {} on DataNode: {}", requestType, formattedTargetLocation);
} else {
- LOGGER.error(
+ logFailure(
"Failed to {} on DataNode: {}, response: {}",
requestType,
formattedTargetLocation,
response);
}
- // Always CountDown
countDownLatch.countDown();
}
@@ -76,14 +73,21 @@ public void onError(Exception e) {
+ formattedTargetLocation
+ ", exception: "
+ e.getMessage();
- LOGGER.error(errorMsg);
+ logFailure(errorMsg);
responseMap.put(
requestId,
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
- // Always CountDown
countDownLatch.countDown();
}
+
+ private void logFailure(final String format, final Object... args) {
+ if (requestType == CnToDnAsyncRequestType.SUBSCRIPTION_PUSH_RUNTIME) {
+ LOGGER.warn(format, args);
+ } else {
+ LOGGER.error(format, args);
+ }
+ }
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/ConsumerGroupPushMetaRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/ConsumerGroupPushMetaRPCHandler.java
index 2938d4f85b7cd..67ee9f372d747 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/ConsumerGroupPushMetaRPCHandler.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/ConsumerGroupPushMetaRPCHandler.java
@@ -49,23 +49,19 @@ public ConsumerGroupPushMetaRPCHandler(
@Override
public void onComplete(TPushConsumerGroupMetaResp response) {
- // Put response
responseMap.put(requestId, response);
if (response.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.info("Successfully {} on DataNode: {}", requestType, formattedTargetLocation);
+ LOGGER.debug("Successfully {} on DataNode: {}", requestType, formattedTargetLocation);
} else {
- LOGGER.error(
+ LOGGER.warn(
"Failed to {} on DataNode: {}, response: {}",
requestType,
formattedTargetLocation,
response);
}
- // Always remove to avoid retrying
nodeLocationMap.remove(requestId);
-
- // Always CountDown
countDownLatch.countDown();
}
@@ -78,14 +74,13 @@ public void onError(Exception e) {
+ formattedTargetLocation
+ ", exception: "
+ e.getMessage();
- LOGGER.error(errorMsg, e);
+ LOGGER.warn(errorMsg);
responseMap.put(
requestId,
new TPushConsumerGroupMetaResp(
RpcUtils.getStatus(TSStatusCode.CONSUMER_PUSH_META_ERROR, errorMsg)));
- // Always CountDown
countDownLatch.countDown();
}
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/PullCommitProgressRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/PullCommitProgressRPCHandler.java
new file mode 100644
index 0000000000000..a34dd627f320f
--- /dev/null
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/PullCommitProgressRPCHandler.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.async.handlers.rpc.subscription;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import org.apache.iotdb.confignode.client.async.handlers.rpc.DataNodeAsyncRequestRPCHandler;
+import org.apache.iotdb.mpp.rpc.thrift.TPullCommitProgressResp;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class PullCommitProgressRPCHandler
+ extends DataNodeAsyncRequestRPCHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PullCommitProgressRPCHandler.class);
+
+ public PullCommitProgressRPCHandler(
+ CnToDnAsyncRequestType requestType,
+ int requestId,
+ TDataNodeLocation targetDataNode,
+ Map dataNodeLocationMap,
+ Map responseMap,
+ CountDownLatch countDownLatch) {
+ super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch);
+ }
+
+ @Override
+ public void onComplete(TPullCommitProgressResp response) {
+ responseMap.put(requestId, response);
+
+ if (response.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ logSuspiciousRegionProgressPayloads(response);
+ LOGGER.info("Successfully {} on DataNode: {}", requestType, formattedTargetLocation);
+ } else {
+ LOGGER.error(
+ "Failed to {} on DataNode: {}, response: {}",
+ requestType,
+ formattedTargetLocation,
+ response);
+ }
+
+ nodeLocationMap.remove(requestId);
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onError(Exception e) {
+ String errorMsg =
+ "Failed to "
+ + requestType
+ + " on DataNode: "
+ + formattedTargetLocation
+ + ", exception: "
+ + e.getMessage();
+ LOGGER.error(errorMsg, e);
+
+ responseMap.put(
+ requestId,
+ new TPullCommitProgressResp(
+ RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, errorMsg)));
+
+ countDownLatch.countDown();
+ }
+
+ private void logSuspiciousRegionProgressPayloads(final TPullCommitProgressResp response) {
+ if (response == null || !response.isSetCommitRegionProgress()) {
+ return;
+ }
+ for (final Map.Entry entry :
+ response.getCommitRegionProgress().entrySet()) {
+ if (isSuspiciousRegionProgressPayload(entry.getValue())) {
+ LOGGER.warn(
+ "PULL_COMMIT_PROGRESS confignode recv suspicious payload from DataNode {}, key={}, summary={}",
+ formattedTargetLocation,
+ entry.getKey(),
+ summarizeRegionProgressPayload(entry.getValue()));
+ }
+ }
+ }
+
+ private boolean isSuspiciousRegionProgressPayload(final java.nio.ByteBuffer buffer) {
+ if (buffer == null) {
+ return true;
+ }
+ final java.nio.ByteBuffer duplicate = buffer.slice();
+ if (duplicate.remaining() < Integer.BYTES) {
+ return true;
+ }
+ final int firstInt = duplicate.getInt();
+ return firstInt < 0 || firstInt > 1_000_000;
+ }
+
+ private String summarizeRegionProgressPayload(final java.nio.ByteBuffer buffer) {
+ if (buffer == null) {
+ return "null";
+ }
+ final int position = buffer.position();
+ final int limit = buffer.limit();
+ final int capacity = buffer.capacity();
+ final java.nio.ByteBuffer duplicate = buffer.slice();
+ final int remaining = duplicate.remaining();
+ final String firstIntSummary;
+ if (remaining >= Integer.BYTES) {
+ final int firstInt = duplicate.getInt();
+ firstIntSummary = firstInt + "(0x" + String.format("%08x", firstInt) + ")";
+ duplicate.position(0);
+ } else {
+ firstIntSummary = "n/a";
+ }
+ final int sampleLength = Math.min(16, remaining);
+ final byte[] sample = new byte[sampleLength];
+ duplicate.get(sample, 0, sampleLength);
+ return "pos="
+ + position
+ + ", limit="
+ + limit
+ + ", capacity="
+ + capacity
+ + ", remaining="
+ + remaining
+ + ", firstInt="
+ + firstIntSummary
+ + ", firstBytes="
+ + bytesToHex(sample);
+ }
+
+ private String bytesToHex(final byte[] bytes) {
+ if (bytes == null || bytes.length == 0) {
+ return "";
+ }
+ final StringBuilder builder = new StringBuilder(bytes.length * 2);
+ for (final byte b : bytes) {
+ builder.append(String.format("%02x", b));
+ }
+ return builder.toString();
+ }
+}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/TopicPushMetaRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/TopicPushMetaRPCHandler.java
index 91ffdd7232b3f..2f5e609f0cfec 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/TopicPushMetaRPCHandler.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/subscription/TopicPushMetaRPCHandler.java
@@ -48,23 +48,19 @@ public TopicPushMetaRPCHandler(
@Override
public void onComplete(TPushTopicMetaResp response) {
- // Put response
responseMap.put(requestId, response);
if (response.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.info("Successfully {} on DataNode: {}", requestType, formattedTargetLocation);
+ LOGGER.debug("Successfully {} on DataNode: {}", requestType, formattedTargetLocation);
} else {
- LOGGER.error(
+ LOGGER.warn(
"Failed to {} on DataNode: {}, response: {}",
requestType,
formattedTargetLocation,
response);
}
- // Always remove to avoid retrying
nodeLocationMap.remove(requestId);
-
- // Always CountDown
countDownLatch.countDown();
}
@@ -77,13 +73,12 @@ public void onError(Exception e) {
+ formattedTargetLocation
+ ", exception: "
+ e.getMessage();
- LOGGER.error(errorMsg, e);
+ LOGGER.warn(errorMsg);
responseMap.put(
requestId,
new TPushTopicMetaResp(RpcUtils.getStatus(TSStatusCode.TOPIC_PUSH_META_ERROR, errorMsg)));
- // Always CountDown
countDownLatch.countDown();
}
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
index ffe333b56dd78..1eb46fdc330c4 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java
@@ -87,6 +87,7 @@
import org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
import org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.AlterConsumerGroupPlan;
+import org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.CommitProgressHandleMetaChangePlan;
import org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.ConsumerGroupHandleMetaChangePlan;
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.AlterMultipleTopicsPlan;
import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.AlterTopicPlan;
@@ -542,6 +543,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case ConsumerGroupHandleMetaChange:
plan = new ConsumerGroupHandleMetaChangePlan();
break;
+ case CommitProgressHandleMetaChange:
+ plan = new CommitProgressHandleMetaChangePlan();
+ break;
case PipeUnsetTemplate:
plan = new PipeUnsetSchemaTemplatePlan();
break;
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index fe04b93d9ad4b..ae10e9898f251 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -324,6 +324,8 @@ public enum ConfigPhysicalPlanType {
ShowSubscription((short) 2000),
+ CommitProgressHandleMetaChange((short) 2001),
+
// Authority version after and equal 2.0
DropUserV2((short) 2100),
UpdateUserV2((short) 2101),
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/subscription/consumer/runtime/CommitProgressHandleMetaChangePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/subscription/consumer/runtime/CommitProgressHandleMetaChangePlan.java
new file mode 100644
index 0000000000000..387b0a43b4a61
--- /dev/null
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/subscription/consumer/runtime/CommitProgressHandleMetaChangePlan.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime;
+
+import org.apache.iotdb.commons.subscription.meta.consumer.CommitProgressKeeper;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/** Consensus plan for handling per-region commit progress meta changes. */
+public class CommitProgressHandleMetaChangePlan extends ConfigPhysicalPlan {
+
+ private Map regionProgressMap = new HashMap<>();
+
+ public CommitProgressHandleMetaChangePlan() {
+ super(ConfigPhysicalPlanType.CommitProgressHandleMetaChange);
+ }
+
+ public CommitProgressHandleMetaChangePlan(final Map regionProgressMap) {
+ super(ConfigPhysicalPlanType.CommitProgressHandleMetaChange);
+ this.regionProgressMap = regionProgressMap;
+ }
+
+ public Map getRegionProgressMap() {
+ return regionProgressMap;
+ }
+
+ @Override
+ protected void serializeImpl(final DataOutputStream stream) throws IOException {
+ stream.writeShort(getType().getPlanType());
+ stream.writeInt(regionProgressMap.size());
+ for (final Map.Entry entry : regionProgressMap.entrySet()) {
+ final byte[] keyBytes = entry.getKey().getBytes("UTF-8");
+ final ByteBuffer valueBuffer = entry.getValue().asReadOnlyBuffer();
+ valueBuffer.rewind();
+ final byte[] valueBytes = new byte[valueBuffer.remaining()];
+ valueBuffer.get(valueBytes);
+ stream.writeInt(keyBytes.length);
+ stream.write(keyBytes);
+ stream.writeInt(valueBytes.length);
+ stream.write(valueBytes);
+ }
+ }
+
+ @Override
+ protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
+ regionProgressMap = CommitProgressKeeper.deserializeRegionProgressFromBuffer(buffer);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final CommitProgressHandleMetaChangePlan that = (CommitProgressHandleMetaChangePlan) obj;
+ return Objects.equals(this.regionProgressMap, that.regionProgressMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(regionProgressMap);
+ }
+}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 182dc2f9fb249..d8c06062756a3 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -191,6 +191,8 @@
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTopicInfoResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetCommitProgressReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetCommitProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetDataNodeLocationsResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
@@ -254,6 +256,9 @@
import org.apache.iotdb.db.schemaengine.template.alter.TemplateAlterOperationUtil;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.payload.poll.RegionProgress;
+import org.apache.iotdb.rpc.subscription.payload.poll.WriterId;
+import org.apache.iotdb.rpc.subscription.payload.poll.WriterProgress;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -264,6 +269,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URL;
@@ -276,8 +283,10 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -2508,6 +2517,83 @@ public TGetAllSubscriptionInfoResp getAllSubscriptionInfo() {
: new TGetAllSubscriptionInfoResp(status, Collections.emptyList());
}
+ public TGetCommitProgressResp getCommitProgress(TGetCommitProgressReq req) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return new TGetCommitProgressResp(status);
+ }
+ final String key =
+ req.getConsumerGroupId()
+ + "##"
+ + req.getTopicName()
+ + "##"
+ + req.getRegionId()
+ + "##"
+ + req.getDataNodeId();
+ final String keyPrefix =
+ req.getConsumerGroupId() + "##" + req.getTopicName() + "##" + req.getRegionId() + "##";
+ final org.apache.iotdb.commons.subscription.meta.consumer.CommitProgressKeeper keeper =
+ subscriptionManager
+ .getSubscriptionCoordinator()
+ .getSubscriptionInfo()
+ .getCommitProgressKeeper();
+ final Map mergedWriterPositions = new LinkedHashMap<>();
+
+ for (final Map.Entry entry : keeper.getAllRegionProgress().entrySet()) {
+ if (!entry.getKey().startsWith(keyPrefix)) {
+ continue;
+ }
+ final RegionProgress regionProgress = deserializeRegionProgress(entry.getValue());
+ if (Objects.isNull(regionProgress)) {
+ continue;
+ }
+ for (final Map.Entry writerEntry :
+ regionProgress.getWriterPositions().entrySet()) {
+ mergedWriterPositions.merge(
+ writerEntry.getKey(),
+ writerEntry.getValue(),
+ (oldProgress, newProgress) ->
+ compareWriterProgress(newProgress, oldProgress) > 0 ? newProgress : oldProgress);
+ }
+ }
+ final TGetCommitProgressResp resp =
+ new TGetCommitProgressResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ if (!mergedWriterPositions.isEmpty()) {
+ resp.setCommittedRegionProgress(
+ serializeRegionProgress(new RegionProgress(mergedWriterPositions)));
+ }
+ return resp;
+ }
+
+ private static RegionProgress deserializeRegionProgress(final ByteBuffer buffer) {
+ if (Objects.isNull(buffer)) {
+ return null;
+ }
+ final ByteBuffer duplicate = buffer.asReadOnlyBuffer();
+ duplicate.rewind();
+ return RegionProgress.deserialize(duplicate);
+ }
+
+ private static ByteBuffer serializeRegionProgress(final RegionProgress regionProgress) {
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream dos = new DataOutputStream(baos)) {
+ regionProgress.serialize(dos);
+ dos.flush();
+ return ByteBuffer.wrap(baos.toByteArray()).asReadOnlyBuffer();
+ } catch (final IOException e) {
+ throw new RuntimeException("Failed to serialize region progress " + regionProgress, e);
+ }
+ }
+
+ private static int compareWriterProgress(
+ final WriterProgress leftProgress, final WriterProgress rightProgress) {
+ int cmp = Long.compare(leftProgress.getPhysicalTime(), rightProgress.getPhysicalTime());
+ if (cmp != 0) {
+ return cmp;
+ }
+ return Long.compare(leftProgress.getLocalSeq(), rightProgress.getLocalSeq());
+ }
+
@Override
public TPipeConfigTransferResp handleTransferConfigPlan(TPipeConfigTransferReq req) {
TSStatus status = confirmLeader();
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 2c5a77303d9b9..3354948d0e552 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -115,7 +115,9 @@
import org.apache.iotdb.confignode.procedure.impl.schema.table.view.SetViewPropertiesProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.CreateConsumerProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.DropConsumerProcedure;
+import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime.CommitProgressSyncProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime.ConsumerGroupMetaSyncProcedure;
+import org.apache.iotdb.confignode.procedure.impl.subscription.runtime.SubscriptionHandleLeaderChangeProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.subscription.CreateSubscriptionProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.subscription.DropSubscriptionProcedure;
import org.apache.iotdb.confignode.procedure.impl.subscription.topic.CreateTopicProcedure;
@@ -1666,6 +1668,21 @@ public void pipeHandleLeaderChange(
}
}
+ public void subscriptionHandleLeaderChange(
+ Map> regionGroupToOldAndNewLeaderPairMap,
+ long runtimeVersion) {
+ try {
+ final long procedureId =
+ executor.submitProcedure(
+ new SubscriptionHandleLeaderChangeProcedure(
+ regionGroupToOldAndNewLeaderPairMap, runtimeVersion));
+ LOGGER.info(
+ "SubscriptionHandleLeaderChangeProcedure was submitted, procedureId: {}.", procedureId);
+ } catch (Exception e) {
+ LOGGER.warn("SubscriptionHandleLeaderChangeProcedure was failed to submit.", e);
+ }
+ }
+
public void pipeHandleMetaChange(
boolean needWriteConsensusOnConfigNodes, boolean needPushPipeMetaToDataNodes) {
try {
@@ -1815,6 +1832,23 @@ public TSStatus consumerGroupMetaSync() {
}
}
+ public TSStatus commitProgressSync() {
+ try {
+ CommitProgressSyncProcedure procedure = new CommitProgressSyncProcedure();
+ executor.submitProcedure(procedure);
+ TSStatus status = waitingProcedureFinished(procedure);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ } else {
+ return new TSStatus(TSStatusCode.CONSUMER_PUSH_META_ERROR.getStatusCode())
+ .setMessage(wrapTimeoutMessageForPipeProcedure(status.getMessage()));
+ }
+ } catch (Exception e) {
+ return new TSStatus(TSStatusCode.CONSUMER_PUSH_META_ERROR.getStatusCode())
+ .setMessage(e.getMessage());
+ }
+ }
+
public TSStatus createSubscription(TSubscribeReq req) {
try {
CreateSubscriptionProcedure procedure = new CreateSubscriptionProcedure(req);
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 993bfc0e40066..55d9417f30a2b 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -88,6 +88,8 @@ public LoadManager(IManager configManager) {
this.topologyService = new TopologyService(configManager, loadCache::updateTopology);
this.eventService = new EventService(loadCache);
this.eventService.register(configManager.getPipeManager().getPipeRuntimeCoordinator());
+ this.eventService.register(
+ configManager.getSubscriptionManager().getSubscriptionLeaderChangeHandler());
this.eventService.register(routeBalancer);
this.eventService.register(topologyService);
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionManager.java
index 1080b067fae82..ff06e20cf2dc7 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionManager.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionManager.java
@@ -20,17 +20,32 @@
package org.apache.iotdb.confignode.manager.subscription;
import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.subscription.runtime.SubscriptionLeaderChangeHandler;
+import org.apache.iotdb.confignode.manager.subscription.runtime.SubscriptionRuntimeCoordinator;
import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
public class SubscriptionManager {
private final SubscriptionCoordinator subscriptionCoordinator;
+ private final SubscriptionRuntimeCoordinator subscriptionRuntimeCoordinator;
+ private final SubscriptionLeaderChangeHandler subscriptionLeaderChangeHandler;
public SubscriptionManager(ConfigManager configManager, SubscriptionInfo subscriptionInfo) {
this.subscriptionCoordinator = new SubscriptionCoordinator(configManager, subscriptionInfo);
+ this.subscriptionRuntimeCoordinator = new SubscriptionRuntimeCoordinator(configManager);
+ this.subscriptionLeaderChangeHandler =
+ new SubscriptionLeaderChangeHandler(subscriptionRuntimeCoordinator);
}
public SubscriptionCoordinator getSubscriptionCoordinator() {
return subscriptionCoordinator;
}
+
+ public SubscriptionRuntimeCoordinator getSubscriptionRuntimeCoordinator() {
+ return subscriptionRuntimeCoordinator;
+ }
+
+ public SubscriptionLeaderChangeHandler getSubscriptionLeaderChangeHandler() {
+ return subscriptionLeaderChangeHandler;
+ }
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionMetaSyncer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionMetaSyncer.java
index de49987e13fbe..4931a2948fc61 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionMetaSyncer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionMetaSyncer.java
@@ -106,6 +106,13 @@ private synchronized void sync() {
return;
}
+ // sync commit progress if syncing consumer group meta successfully
+ final TSStatus commitProgressSyncStatus = procedureManager.commitProgressSync();
+ if (commitProgressSyncStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn("Failed to sync commit progress. Result status: {}.", commitProgressSyncStatus);
+ return;
+ }
+
LOGGER.info(
"After this successful sync, if SubscriptionInfo is empty during this sync and has not been modified afterwards, all subsequent syncs will be skipped");
isLastSubscriptionSyncSuccessful = true;
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/runtime/SubscriptionLeaderChangeHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/runtime/SubscriptionLeaderChangeHandler.java
new file mode 100644
index 0000000000000..6b888e424aa9c
--- /dev/null
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/runtime/SubscriptionLeaderChangeHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.subscription.runtime;
+
+import org.apache.iotdb.confignode.manager.load.subscriber.ConsensusGroupStatisticsChangeEvent;
+import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
+import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
+
+public class SubscriptionLeaderChangeHandler implements IClusterStatusSubscriber {
+
+ private final SubscriptionRuntimeCoordinator runtimeCoordinator;
+
+ public SubscriptionLeaderChangeHandler(final SubscriptionRuntimeCoordinator runtimeCoordinator) {
+ this.runtimeCoordinator = runtimeCoordinator;
+ }
+
+ @Override
+ public void onNodeStatisticsChanged(final NodeStatisticsChangeEvent event) {
+ runtimeCoordinator.handleNodeStatisticsChange(event);
+ }
+
+ @Override
+ public void onConsensusGroupStatisticsChanged(final ConsensusGroupStatisticsChangeEvent event) {
+ runtimeCoordinator.handleLeaderChangeEvent(event);
+ }
+}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/runtime/SubscriptionRuntimeCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/runtime/SubscriptionRuntimeCoordinator.java
new file mode 100644
index 0000000000000..399327b0119e4
--- /dev/null
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/runtime/SubscriptionRuntimeCoordinator.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.subscription.runtime;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.subscriber.ConsensusGroupStatisticsChangeEvent;
+import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SubscriptionRuntimeCoordinator {
+
+ private final ConfigManager configManager;
+ private final Map