Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ public void testSingleEnv() throws Exception {
TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"drop pipe a2b_history",
"drop pipe a2b_realtime",
"drop pipe if exists a2b_history",
"drop pipe if exists a2b_realtime",
String.format(
"create pipe a2b1 with source ('inclusion'='schema') with sink ('node-urls'='%s')",
receiverDataNode.getIpAndPortString()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY;
Expand Down Expand Up @@ -109,6 +111,7 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY;
Expand Down Expand Up @@ -308,6 +311,10 @@ private void customizeServer(final PipeParameters parameters) {
if (securityPolicies.isEmpty()) {
throw new PipeException("The security policy cannot be empty.");
}
final long debounceTimeMs =
parameters.getLongOrDefault(
Arrays.asList(CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY, SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY),
CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE);

synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
serverKey = httpsBindPort + ":" + tcpBindPort;
Expand All @@ -327,7 +334,8 @@ private void customizeServer(final PipeParameters parameters) {
.setPassword(password)
.setSecurityDir(securityDir)
.setEnableAnonymousAccess(enableAnonymousAccess)
.setSecurityPolicies(securityPolicies);
.setSecurityPolicies(securityPolicies)
.setDebounceTimeMs(debounceTimeMs);
final OpcUaServer newServer = builder.build();
nameSpace = new OpcUaNameSpace(newServer, builder);
nameSpace.startup();
Expand All @@ -341,7 +349,8 @@ private void customizeServer(final PipeParameters parameters) {
password,
securityDir,
enableAnonymousAccess,
securityPolicies);
securityPolicies,
debounceTimeMs);
return oldValue;
}
} catch (final PipeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.eclipse.milo.opcua.sdk.server.nodes.UaFolderNode;
import org.eclipse.milo.opcua.sdk.server.nodes.UaNode;
import org.eclipse.milo.opcua.sdk.server.nodes.UaVariableNode;
import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
Expand All @@ -57,6 +57,7 @@
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -71,20 +72,34 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class);
public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
private final SubscriptionModel subscriptionModel;
private final OpcUaServerBuilder builder;

// Do not use subscription model because the original subscription model has some bugs
private final ConcurrentMap<NodeId, List<DataItem>> nodeSubscriptions = new ConcurrentHashMap<>();

// Debounce task cache: used to merge updates within a short period of time, avoiding unnecessary
// duplicate pushes
private final ConcurrentMap<NodeId, ScheduledFuture<?>> debounceTasks = new ConcurrentHashMap<>();
// Debounce interval: within 10ms, the same node is updated multiple times, and only the last one
// will be pushed (can be adjusted according to your site delay requirements, the minimum can be
// set to 1ms)
private final long debounceIntervalMs;

public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) {
super(server, NAMESPACE_URI);
this.builder = builder;
debounceIntervalMs = builder.getDebounceTimeMs();

subscriptionModel = new SubscriptionModel(server, this);
getLifecycleManager().addLifecycle(subscriptionModel);
getLifecycleManager()
.addLifecycle(
new Lifecycle() {
Expand Down Expand Up @@ -191,7 +206,7 @@
throws Exception;
}

private void transferTabletRowForClientServerModel(

Check warning on line 209 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 117 to 64, Complexity from 23 to 14, Nesting Level from 3 to 2, Number of Variables from 23 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ2qmKMgJZ8VGf1mEBeM&open=AZ2qmKMgJZ8VGf1mEBeM&pullRequest=17525
final String[] segments,
final List<IMeasurementSchema> measurementSchemas,
final List<Long> timestamps,
Expand Down Expand Up @@ -291,7 +306,7 @@
if (Objects.isNull(measurementNode.getValue())
|| Objects.isNull(measurementNode.getValue().getSourceTime())
|| measurementNode.getValue().getSourceTime().getUtcTime() < utcTimestamp) {
measurementNode.setValue(dataValue);
notifyNodeValueChange(measurementNode.getNodeId(), dataValue, measurementNode);
}
} else {
value = values.get(i);
Expand All @@ -311,9 +326,11 @@
if (Objects.isNull(valueNode.getValue())
|| Objects.isNull(valueNode.getValue().getSourceTime())
|| valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
valueNode.setValue(
notifyNodeValueChange(
valueNode.getNodeId(),
new DataValue(
new Variant(value), currentQuality, new DateTime(timestamp), new DateTime()));
new Variant(value), currentQuality, new DateTime(timestamp), new DateTime()),
valueNode);
}
}
}
Expand Down Expand Up @@ -399,7 +416,7 @@
* @param tablet the tablet to send
* @throws UaException if failed to create {@link Event}
*/
private void transferTabletForPubSubModel(

Check warning on line 419 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 102 to 64, Complexity from 25 to 14, Nesting Level from 4 to 2, Number of Variables from 12 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ2qmKMgJZ8VGf1mEBeN&open=AZ2qmKMgJZ8VGf1mEBeN&pullRequest=17525
final Tablet tablet, final boolean isTableModel, final OpcUaSink sink) throws UaException {
final BaseEventTypeNode eventNode =
getServer()
Expand Down Expand Up @@ -546,24 +563,131 @@
}
}

/**

Check warning on line 566 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

First sentence of Javadoc is missing an ending period.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ2qmKMgJZ8VGf1mEBeO&open=AZ2qmKMgJZ8VGf1mEBeO&pullRequest=17525
* On point value changing, notify all subscribed clients proactively
*
* @param nodeId NodeId of the changing node
* @param newValue New value of the node (DataValue object containing value, status code, and
* timestamp)
* @param variableNode Corresponding UaVariableNode instance, used to update the local cached
* value of the node
*/
public void notifyNodeValueChange(
NodeId nodeId, DataValue newValue, UaVariableNode variableNode) {
// 1. Update the local cached value of the node
variableNode.setValue(newValue);

// 2. If there are no subscribers, return directly without doing any extra operations
List<DataItem> subscribedItems = nodeSubscriptions.get(nodeId);
if (subscribedItems == null || subscribedItems.isEmpty()) {
return;
}

// 2. Debounce+Async Push: Asynchronously push the expensive push operation, while merging
// high-frequency repeated updates
debounceTasks.compute(
nodeId,
(k, oldTask) -> {
// If there is already a pending push task, cancel it, we only need the latest value
if (oldTask != null && !oldTask.isDone()) {
oldTask.cancel(false);
}

// Submit the push task to the Milo's scheduled thread pool, delay DEBOUNCE_INTERVAL_MS
// execution
return getServer()
.getScheduledExecutorService()
.schedule(
() -> {
try {
// Batch push changes to all subscribers, this time-consuming operation is put
// into the thread pool, not blocking your data update thread
for (DataItem item : subscribedItems) {
try {
item.setValue(newValue);
} catch (Exception e) {
// Single client push failure does not affect other clients
LOGGER.warn(
"Failed to push value change to client, nodeId={}", nodeId, e);
}
}
} finally {
// Task execution completed, clean up the debounce cache
debounceTasks.remove(nodeId);
}
},
debounceIntervalMs,
TimeUnit.MILLISECONDS);
});
}

@Override
public void onDataItemsCreated(final List<DataItem> dataItems) {
subscriptionModel.onDataItemsCreated(dataItems);
for (DataItem item : dataItems) {
final ReadValueId readValueId = item.getReadValueId();
// Only handle Value attribute subscription (align with the original SubscriptionModel logic,
// ignore other attribute subscriptions)
if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) {
continue;
}
final NodeId nodeId = readValueId.getNodeId();

// 1. Add the new subscription item to the subscription mapping
nodeSubscriptions.compute(
nodeId,
(k, existingList) -> {
List<DataItem> list =
existingList != null ? existingList : new CopyOnWriteArrayList<>();
list.add(item);
return list;
});

// 2. 【Key Optimization】Proactively push the current node's initial value when the new
// subscription item is created
// Eliminate Bad_WaitingForInitialData, no need to wait for any polling
try {
UaVariableNode node = (UaVariableNode) getNodeManager().getNode(nodeId).orElse(null);
if (node != null && node.getValue() != null) {
// Immediately push the current value to the new subscriber, the client will instantly be
// able to get the initial data
item.setValue(node.getValue());
}
} catch (Exception e) {
LOGGER.warn("Failed to send initial value to new subscription, nodeId={}", nodeId, e);
}
}
}

@Override
public void onDataItemsModified(final List<DataItem> dataItems) {
subscriptionModel.onDataItemsModified(dataItems);
// Push mode, client modifies subscription parameters (e.g. sampling interval) has no effect on
// our active push, no additional processing is needed
}

@Override
public void onDataItemsDeleted(final List<DataItem> dataItems) {
subscriptionModel.onDataItemsDeleted(dataItems);
for (DataItem item : dataItems) {
final ReadValueId readValueId = item.getReadValueId();
if (!AttributeId.Value.isEqual(readValueId.getAttributeId())) {
continue;
}
final NodeId nodeId = readValueId.getNodeId();

// When the client cancels the subscription, remove this subscription item from the mapping
nodeSubscriptions.computeIfPresent(
nodeId,
(k, existingList) -> {
existingList.remove(item);
// Automatically clean up the key when there are no subscribers, save memory
return existingList.isEmpty() ? null : existingList;
});
}
}

@Override
public void onMonitoringModeChanged(final List<MonitoredItem> monitoredItems) {
subscriptionModel.onMonitoringModeChanged(monitoredItems);
// Push mode, monitoring mode change has no effect on active push, no additional processing is
// needed
}

/////////////////////////////// Conflict detection ///////////////////////////////
Expand All @@ -573,8 +697,14 @@
final String password,
final String securityDir,
final boolean enableAnonymousAccess,
final Set<SecurityPolicy> securityPolicies) {
final Set<SecurityPolicy> securityPolicies,
final long debounceTimeMs) {
builder.checkEquals(
user, password, Paths.get(securityDir), enableAnonymousAccess, securityPolicies);
user,
password,
Paths.get(securityDir),
enableAnonymousAccess,
securityPolicies,
debounceTimeMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class OpcUaServerBuilder implements Closeable {
private boolean enableAnonymousAccess;
private Set<SecurityPolicy> securityPolicies;
private DefaultTrustListManager trustListManager;
private long debounceTimeMs;

public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) {
this.tcpBindPort = tcpBindPort;
Expand Down Expand Up @@ -123,6 +124,15 @@ public OpcUaServerBuilder setSecurityPolicies(final Set<SecurityPolicy> security
return this;
}

public OpcUaServerBuilder setDebounceTimeMs(long debounceTimeMs) {
this.debounceTimeMs = debounceTimeMs;
return this;
}

public long getDebounceTimeMs() {
return debounceTimeMs;
}

public OpcUaServer build() throws Exception {
Files.createDirectories(securityDir);
if (!Files.exists(securityDir)) {
Expand Down Expand Up @@ -314,7 +324,8 @@ void checkEquals(
final String password,
final Path securityDir,
final boolean enableAnonymousAccess,
final Set<SecurityPolicy> securityPolicies) {
final Set<SecurityPolicy> securityPolicies,
final long debounceTimeMs) {
checkEquals("user", this.user, user);
checkEquals("password", this.password, password);
checkEquals(
Expand All @@ -323,6 +334,7 @@ void checkEquals(
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess, enableAnonymousAccess);
checkEquals("securityPolicies", this.securityPolicies, securityPolicies);
checkEquals("debounceTimeMs", this.debounceTimeMs, debounceTimeMs);
}

private void checkEquals(final String attrName, Object thisAttr, Object thatAttr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ public class PipeSinkConstant {
"connector.opcua.timeout-seconds";
public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE = 10L;

public static final String CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_KEY =
"connector.opcua.debounce-time-ms";
public static final String SINK_OPC_UA_DEBOUNCE_TIME_MS_KEY = "sink.opcua.debounce-time-ms";
public static final long CONNECTOR_OPC_UA_DEBOUNCE_TIME_MS_DEFAULT_VALUE = 50L;

public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = "connector.leader-cache.enable";
public static final String SINK_LEADER_CACHE_ENABLE_KEY = "sink.leader-cache.enable";
public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = true;
Expand Down
Loading