diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java index 2ebad93348cfd..5be609aba2f5a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java @@ -111,8 +111,8 @@ public void testSingleEnv() throws Exception { TestUtils.executeNonQueries( senderEnv, Arrays.asList( - "drop pipe a2b_history", - "drop pipe a2b_realtime", + "drop pipe if exists a2b_history", + "drop pipe if exists a2b_realtime", String.format( "create pipe a2b1 with source ('inclusion'='schema') with sink ('node-urls'='%s')", receiverDataNode.getIpAndPortString()), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java index 532ebf601597d..b5f383619e153 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java @@ -69,6 +69,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY; @@ -109,6 +111,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY; @@ -308,6 +311,10 @@ private void customizeServer(final PipeParameters parameters) { if (securityPolicies.isEmpty()) { throw new PipeException("The security policy cannot be empty."); } + final long debounceTimeMs = + parameters.getLongOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY, SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY), + CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE); synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) { serverKey = httpsBindPort + ":" + tcpBindPort; @@ -327,7 +334,8 @@ private void customizeServer(final PipeParameters parameters) { .setPassword(password) .setSecurityDir(securityDir) .setEnableAnonymousAccess(enableAnonymousAccess) - .setSecurityPolicies(securityPolicies); + .setSecurityPolicies(securityPolicies) + .setDebounceTimeMs(debounceTimeMs); final OpcUaServer newServer = builder.build(); nameSpace = new OpcUaNameSpace(newServer, builder); nameSpace.startup(); @@ -341,7 +349,8 @@ private void customizeServer(final PipeParameters parameters) { password, securityDir, enableAnonymousAccess, - securityPolicies); + securityPolicies, + debounceTimeMs); return oldValue; } } catch (final PipeException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java index 3c79a3aa30454..d1182e59b52c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java @@ -47,7 +47,7 @@ import org.eclipse.milo.opcua.sdk.server.nodes.UaFolderNode; import org.eclipse.milo.opcua.sdk.server.nodes.UaNode; import org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode; -import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel; +import org.eclipse.milo.opcua.stack.core.AttributeId; import org.eclipse.milo.opcua.stack.core.Identifiers; import org.eclipse.milo.opcua.stack.core.UaException; import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; @@ -57,6 +57,7 @@ import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; +import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,20 +72,34 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle { private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class); public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server"; - private final SubscriptionModel subscriptionModel; private final OpcUaServerBuilder builder; + // Do not use subscription model because the original subscription model has some bugs + private final ConcurrentMap> nodeSubscriptions = new ConcurrentHashMap<>(); + + // Debounce task cache: used to merge updates within a short period of time, avoiding unnecessary + // duplicate pushes + private final ConcurrentMap> debounceTasks = new ConcurrentHashMap<>(); + // Debounce interval: within 10ms, the same node is updated multiple times, and only the last one + // will be pushed (can be adjusted according to your site delay requirements, the minimum can be + // set to 1ms) + private final long debounceIntervalMs; + public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) { super(server, NAMESPACE_URI); this.builder = builder; + debounceIntervalMs = builder.getDebounceTimeMs(); - subscriptionModel = new SubscriptionModel(server, this); - getLifecycleManager().addLifecycle(subscriptionModel); getLifecycleManager() .addLifecycle( new Lifecycle() { @@ -291,7 +306,7 @@ private void transferTabletRowForClientServerModel( if (Objects.isNull(measurementNode.getValue()) || Objects.isNull(measurementNode.getValue().getSourceTime()) || measurementNode.getValue().getSourceTime().getUtcTime() < utcTimestamp) { - measurementNode.setValue(dataValue); + notifyNodeValueChange(measurementNode.getNodeId(), dataValue, measurementNode); } } else { value = values.get(i); @@ -311,9 +326,11 @@ private void transferTabletRowForClientServerModel( if (Objects.isNull(valueNode.getValue()) || Objects.isNull(valueNode.getValue().getSourceTime()) || valueNode.getValue().getSourceTime().getUtcTime() < timestamp) { - valueNode.setValue( + notifyNodeValueChange( + valueNode.getNodeId(), new DataValue( - new Variant(value), currentQuality, new DateTime(timestamp), new DateTime())); + new Variant(value), currentQuality, new DateTime(timestamp), new DateTime()), + valueNode); } } } @@ -546,24 +563,131 @@ public static NodeId convertToOpcDataType(final TSDataType type) { } } + /** + * On point value changing, notify all subscribed clients proactively + * + * @param nodeId NodeId of the changing node + * @param newValue New value of the node (DataValue object containing value, status code, and + * timestamp) + * @param variableNode Corresponding UaVariableNode instance, used to update the local cached + * value of the node + */ + public void notifyNodeValueChange( + NodeId nodeId, DataValue newValue, UaVariableNode variableNode) { + // 1. Update the local cached value of the node + variableNode.setValue(newValue); + + // 2. If there are no subscribers, return directly without doing any extra operations + List subscribedItems = nodeSubscriptions.get(nodeId); + if (subscribedItems == null || subscribedItems.isEmpty()) { + return; + } + + // 2. Debounce+Async Push: Asynchronously push the expensive push operation, while merging + // high-frequency repeated updates + debounceTasks.compute( + nodeId, + (k, oldTask) -> { + // If there is already a pending push task, cancel it, we only need the latest value + if (oldTask != null && !oldTask.isDone()) { + oldTask.cancel(false); + } + + // Submit the push task to the Milo's scheduled thread pool, delay DEBOUNCE_INTERVAL_MS + // execution + return getServer() + .getScheduledExecutorService() + .schedule( + () -> { + try { + // Batch push changes to all subscribers, this time-consuming operation is put + // into the thread pool, not blocking your data update thread + for (DataItem item : subscribedItems) { + try { + item.setValue(newValue); + } catch (Exception e) { + // Single client push failure does not affect other clients + LOGGER.warn( + "Failed to push value change to client, nodeId={}", nodeId, e); + } + } + } finally { + // Task execution completed, clean up the debounce cache + debounceTasks.remove(nodeId); + } + }, + debounceIntervalMs, + TimeUnit.MILLISECONDS); + }); + } + @Override public void onDataItemsCreated(final List dataItems) { - subscriptionModel.onDataItemsCreated(dataItems); + for (DataItem item : dataItems) { + final ReadValueId readValueId = item.getReadValueId(); + // Only handle Value attribute subscription (align with the original SubscriptionModel logic, + // ignore other attribute subscriptions) + if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) { + continue; + } + final NodeId nodeId = readValueId.getNodeId(); + + // 1. Add the new subscription item to the subscription mapping + nodeSubscriptions.compute( + nodeId, + (k, existingList) -> { + List list = + existingList != null ? existingList : new CopyOnWriteArrayList<>(); + list.add(item); + return list; + }); + + // 2. 【Key Optimization】Proactively push the current node's initial value when the new + // subscription item is created + // Eliminate Bad_WaitingForInitialData, no need to wait for any polling + try { + UaVariableNode node = (UaVariableNode) getNodeManager().getNode(nodeId).orElse(null); + if (node != null && node.getValue() != null) { + // Immediately push the current value to the new subscriber, the client will instantly be + // able to get the initial data + item.setValue(node.getValue()); + } + } catch (Exception e) { + LOGGER.warn("Failed to send initial value to new subscription, nodeId={}", nodeId, e); + } + } } @Override public void onDataItemsModified(final List dataItems) { - subscriptionModel.onDataItemsModified(dataItems); + // Push mode, client modifies subscription parameters (e.g. sampling interval) has no effect on + // our active push, no additional processing is needed } @Override public void onDataItemsDeleted(final List dataItems) { - subscriptionModel.onDataItemsDeleted(dataItems); + for (DataItem item : dataItems) { + final ReadValueId readValueId = item.getReadValueId(); + if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) { + continue; + } + final NodeId nodeId = readValueId.getNodeId(); + + // When the client cancels the subscription, remove this subscription item from the mapping + nodeSubscriptions.computeIfPresent( + nodeId, + (k, existingList) -> { + existingList.remove(item); + // Automatically clean up the key when there are no subscribers, save memory + return existingList.isEmpty() ? null : existingList; + }); + } } @Override public void onMonitoringModeChanged(final List monitoredItems) { - subscriptionModel.onMonitoringModeChanged(monitoredItems); + // Push mode, monitoring mode change has no effect on active push, no additional processing is + // needed } /////////////////////////////// Conflict detection /////////////////////////////// @@ -573,8 +697,14 @@ public void checkEquals( final String password, final String securityDir, final boolean enableAnonymousAccess, - final Set securityPolicies) { + final Set securityPolicies, + final long debounceTimeMs) { builder.checkEquals( - user, password, Paths.get(securityDir), enableAnonymousAccess, securityPolicies); + user, + password, + Paths.get(securityDir), + enableAnonymousAccess, + securityPolicies, + debounceTimeMs); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java index f029031b6175b..281d6eae77edd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java @@ -86,6 +86,7 @@ public class OpcUaServerBuilder implements Closeable { private boolean enableAnonymousAccess; private Set securityPolicies; private DefaultTrustListManager trustListManager; + private long debounceTimeMs; public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) { this.tcpBindPort = tcpBindPort; @@ -123,6 +124,15 @@ public OpcUaServerBuilder setSecurityPolicies(final Set security return this; } + public OpcUaServerBuilder setDebounceTimeMs(long debounceTimeMs) { + this.debounceTimeMs = debounceTimeMs; + return this; + } + + public long getDebounceTimeMs() { + return debounceTimeMs; + } + public OpcUaServer build() throws Exception { Files.createDirectories(securityDir); if (!Files.exists(securityDir)) { @@ -314,7 +324,8 @@ void checkEquals( final String password, final Path securityDir, final boolean enableAnonymousAccess, - final Set securityPolicies) { + final Set securityPolicies, + final long debounceTimeMs) { checkEquals("user", this.user, user); checkEquals("password", this.password, password); checkEquals( @@ -323,6 +334,7 @@ void checkEquals( FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString())); checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess, enableAnonymousAccess); checkEquals("securityPolicies", this.securityPolicies, securityPolicies); + checkEquals("debounceTimeMs", this.debounceTimeMs, debounceTimeMs); } private void checkEquals(final String attrName, Object thisAttr, Object thatAttr) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java index 7e1e38e149013..e6f14911e66aa 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java @@ -242,6 +242,11 @@ public class PipeSinkConstant { "connector.opcua.timeout-seconds"; public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE = 10L; + public static final String CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY = + "connector.opcua.debounce-time-ms"; + public static final String SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY = "sink.opcua.debounce-time-ms"; + public static final long CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE = 50L; + public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = "connector.leader-cache.enable"; public static final String SINK_LEADER_CACHE_ENABLE_KEY = "sink.leader-cache.enable"; public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = true;