diff --git a/src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.cpp b/src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.cpp new file mode 100644 index 000000000..3438b01ff --- /dev/null +++ b/src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.cpp @@ -0,0 +1,50 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de), Aleksey Timin (atimin@gmail.com) + */ + +#include "OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h" + +namespace OpcUaStackPubSub +{ + + DataSetReaderBase::DataSetReaderBase(void) + { + } + + DataSetReaderBase::~DataSetReaderBase(void) + { + } + + void + DataSetReaderBase::dataSetReaderId(uint32_t dataSetReaderId) + { + dataSetReaderId_ = dataSetReaderId; + } + + uint32_t + DataSetReaderBase::dataSetReaderId(void) + { + return dataSetReaderId_; + } + + bool + DataSetReaderBase::encodeDataSet(boost::asio::streambuf& dataSet, uint16_t& size) + { + // FIXME: todo + return true; + } + +} diff --git a/src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h b/src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h new file mode 100644 index 000000000..a3ff0cf34 --- /dev/null +++ b/src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h @@ -0,0 +1,50 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de), Aleksey Timin (atimin@gmail.com) + */ + +#ifndef __OpcUaStackPubSub_DataSetReaderBase_h__ +#define __OpcUaStackPubSub_DataSetReaderBase_h__ + +#include + +#include + +#include "OpcUaStackCore/Base/os.h" + +namespace OpcUaStackPubSub +{ + + class DLLEXPORT DataSetReaderBase + { + public: + typedef boost::shared_ptr SPtr; + typedef std::map Map; + + DataSetReaderBase(void); + virtual ~DataSetReaderBase(void); + + void dataSetReaderId(uint32_t dataSetReaderId); + uint32_t dataSetReaderId(void); + + bool encodeDataSet(boost::asio::streambuf& dataSet, uint16_t& size); + + public: + uint32_t dataSetReaderId_ = 0; // unique data set writer id + }; + +} + +#endif diff --git a/src/OpcUaStackPubSub/MessageTransport/MessageTransport.cpp b/src/OpcUaStackPubSub/MessageTransport/MessageTransport.cpp new file mode 100644 index 000000000..454e991fc --- /dev/null +++ b/src/OpcUaStackPubSub/MessageTransport/MessageTransport.cpp @@ -0,0 +1,164 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de) + + */ + +#include "OpcUaStackPubSub/MessageTransport/MessageTransport.h" +#include "OpcUaStackPubSub/NetworkMessage/NetworkMessage.h" +#include "OpcUaStackCore/Base/Log.h" +#include "OpcUaStackPubSub/Events/NetworkSendEvent.h" +#include "OpcUaStackPubSub/Events/NetworkRecvEvent.h" + +using namespace OpcUaStackCore; + +namespace OpcUaStackPubSub +{ + + MessageTransport::MessageTransport( + const std::string& connectionName, // message bus member name + const std::string& serviceName, + OpcUaStackCore::IOThread::SPtr& ioThread, + OpcUaStackCore::MessageBus::SPtr& messageBus + ) + : ServerServiceBase() + { + // set parameter + ioThread_ = ioThread; + + // set parameter in server service base + serviceName_ = serviceName; + ServerServiceBase::ioThread_ = ioThread.get(); + strand_ = ioThread->createStrand(); + messageBus_ = messageBus; + connectionName_ = connectionName; + + // register message bus receiver + MessageBusMemberConfig messageBusMemberConfig; + messageBusMemberConfig.strand(strand_); + messageBusMember_ = messageBus_->registerMember(serviceName_, messageBusMemberConfig); + + // activate receiver + activateReceiver( + [this, &messageBusMemberConfig](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { + // receive message from internal message bus + + auto event = boost::static_pointer_cast(message); + switch (event->eventType()) + { + case EventType::NetworkRecvEvent: + { + NetworkRecvEvent::SPtr event = boost::static_pointer_cast(message); + + std::iostream ios(&(event->streamBuf())); + NetworkMessage networkMessage; + networkMessage.opcUaBinaryDecode(ios); + + auto publisherId = networkMessage.networkMessageHeader()->publisherId(); + + auto it = networkMessageProcessorMap_.find(publisherId); + if (it != networkMessageProcessorMap_.end()) { + auto readerGroupBusMember = messageBus_->registerMember(it->second, messageBusMemberConfig); + messageBus_->messageSend(messageBusMember_, readerGroupBusMember, event); + } else { + Log(Error, "network message processor does not exist for this networkmessage") + .parameter("PublisherId", publisherId); + } + + break; + } + case EventType::NetworkSendEvent: + { + Log(Info, "Recieved NetworkSendEvent"); + NetworkSendEvent::SPtr event = boost::static_pointer_cast(message); + messageBus_->messageSend(messageBusMember_, connectionBusMember_, event); + break; + } + default: + { + Log(Error, "invalid message received in message transport module") + .parameter("ServiceName", serviceName_) + .parameter("Event", (uint32_t)event->eventType()); + } + } + } + ); + } + + MessageTransport::~MessageTransport(void) + { + // deactivate receiver + deactivateReceiver(); + messageBus_->deregisterMember(messageBusMember_); + } + + bool + MessageTransport::startup(void) + { + // get reference to connection from message bus + if (!messageBus_->existMember(connectionName_)) { + Log(Error, "udp connection message bus member don't exist") + .parameter("UdpConnectionName", connectionName_); + return false; + } + connectionBusMember_ = messageBus_->getMember(connectionName_); + + return true; + } + + bool + MessageTransport::shutdown(void) + { + // FIXME: todo + return true; + } + + bool + MessageTransport::registerNetworkMessageProcessor( + uint32_t publisherId, // publisher id + const std::string& networkMessageProcessorName // message bus member name + ) + { + auto it = networkMessageProcessorMap_.find(publisherId); + if (it != networkMessageProcessorMap_.end()) { + Log(Error, "register network message processor error, because network message processor d already exist") + .parameter("MessageTransport", serviceName_) + .parameter("NetworkMessageProcessor PublisherId", publisherId); + return false; + } + + // add network message processor to map + networkMessageProcessorMap_.insert(std::make_pair(publisherId, networkMessageProcessorName)); + return true; + } + + bool + MessageTransport::deregisterNetworkMessageProcessor( + uint32_t publisherId + ) + { + auto it = networkMessageProcessorMap_.find(publisherId); + if (it == networkMessageProcessorMap_.end()) { + Log(Error, "deregister network message processor error, because network message processor does not exist") + .parameter("MessageTransport", serviceName_) + .parameter("NetworkMessageProcessor PublisherId", publisherId); + return false; + } + + networkMessageProcessorMap_.erase(it); + return true; + } + +} diff --git a/src/OpcUaStackPubSub/MessageTransport/MessageTransport.h b/src/OpcUaStackPubSub/MessageTransport/MessageTransport.h new file mode 100644 index 000000000..07e0261b0 --- /dev/null +++ b/src/OpcUaStackPubSub/MessageTransport/MessageTransport.h @@ -0,0 +1,75 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de) + + DESCRIPTION: + + 1. Subscriber Mode: + The message transport subscriber module receives a message from the communication + module via the internal message bus.The publisher ID is determined from the received + message and the assigned network message processor is determined. The received message + is forwarded to the assigned network message processor module via the internal message + bus. + 2. Publisher Mode + The message transport publisher module receives a message from one ore more + network message writer groups via the internal message bus. The received message + is forwarded to the assigned communication module via the internal message bus. + + */ + +#ifndef __OpcUaStackPubSub_MessageTransport_h__ +#define __OpcUaStackPubSub_MessageTransport_h__ + +#include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" + +namespace OpcUaStackPubSub +{ + + class DLLEXPORT MessageTransport + : public OpcUaStackCore::Object + , public OpcUaStackServer::ServerServiceBase + { + public: + using SPtr = boost::shared_ptr; + + MessageTransport( + const std::string& connectionName, // message bus member name + const std::string& serviceName, + OpcUaStackCore::IOThread::SPtr& ioThread, + OpcUaStackCore::MessageBus::SPtr& messageBus + ); + ~MessageTransport(void); + + bool startup(void); + bool shutdown(void); + + bool registerNetworkMessageProcessor( + uint32_t publisherId, // publisher id + const std::string& networkMessageProcessorName // message bus member name + ); + bool deregisterNetworkMessageProcessor( + uint32_t publisherId + ); + + private: + OpcUaStackCore::IOThread::SPtr ioThread_; // smart pointer to io thread + std::string connectionName_; + OpcUaStackCore::MessageBusMember::WPtr connectionBusMember_; + std::map networkMessageProcessorMap_; + }; + +} + +#endif diff --git a/src/OpcUaStackPubSub/NetworkMessage/GroupHeader.cpp b/src/OpcUaStackPubSub/NetworkMessage/GroupHeader.cpp new file mode 100644 index 000000000..e577e71b5 --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessage/GroupHeader.cpp @@ -0,0 +1,84 @@ +#include "OpcUaStackPubSub/NetworkMessage/GroupHeader.h" + +namespace OpcUaStackPubSub +{ + + OpcUaByte& GroupHeader::groupFlags() + { + return groupFlags_; + } + void GroupHeader::groupFlags(OpcUaByte& groupFlags) + { + groupFlags_ = groupFlags; + } + + OpcUaUInt16& GroupHeader::writerGroupId() + { + return writerGroupId_; + } + void GroupHeader::writerGroupId(OpcUaUInt16& writerGroupId) + { + writerGroupId_ = writerGroupId; + } + + OpcUaByte& GroupHeader::groupVersion() + { + return groupVersion_; + } + + void GroupHeader::groupVersion(OpcUaByte& groupVersion) + { + groupVersion_ = groupVersion; + } + + OpcUaUInt16& GroupHeader::networkMessageNumber() + { + return networkMessageNumber_; + } + void GroupHeader::networkMessageNumber(OpcUaUInt16& networkMessageNumber) + { + networkMessageNumber_ = networkMessageNumber; + } + OpcUaUInt16& GroupHeader::sequenceNumber() + { + return sequenceNumber_; + } + void GroupHeader::sequenceNumber(OpcUaUInt16& sequenceNumber) + { + sequenceNumber_ = sequenceNumber; + } + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + // + // GroupHeader Encoding and Decoding + // + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + bool + GroupHeader::opcUaBinaryEncode(std::ostream& os) const + { + bool rc = true; + rc &= OpcUaNumber::opcUaBinaryEncode(os, groupFlags_); + rc &= OpcUaNumber::opcUaBinaryEncode(os, writerGroupId_); + rc &= OpcUaNumber::opcUaBinaryEncode(os, groupVersion_); + rc &= OpcUaNumber::opcUaBinaryEncode(os, networkMessageNumber_); + rc &= OpcUaNumber::opcUaBinaryEncode(os, sequenceNumber_); + + return rc; + } + bool + GroupHeader::opcUaBinaryDecode(std::istream& is) + { + bool rc = true; + rc &= OpcUaNumber::opcUaBinaryDecode(is, groupFlags_); + rc &= OpcUaNumber::opcUaBinaryDecode(is, writerGroupId_); + rc &= OpcUaNumber::opcUaBinaryDecode(is, groupVersion_); + rc &= OpcUaNumber::opcUaBinaryDecode(is, networkMessageNumber_); + rc &= OpcUaNumber::opcUaBinaryDecode(is, sequenceNumber_); + return rc; + } + + + + +} \ No newline at end of file diff --git a/src/OpcUaStackPubSub/NetworkMessage/GroupHeader.h b/src/OpcUaStackPubSub/NetworkMessage/GroupHeader.h new file mode 100644 index 000000000..0b5562095 --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessage/GroupHeader.h @@ -0,0 +1,48 @@ +#ifndef __OpcUaStackPubSub_GroupHeader_h__ +#define __OpcUaStackPubSub_GroupHeader_h__ + +#include "OpcUaStackCore/Base/os.h" +#include "OpcUaStackCore/BuildInTypes/OpcUaGuid.h" +#include "OpcUaStackCore/Base/Utility.h" +#include + +using namespace OpcUaStackCore; + +namespace OpcUaStackPubSub +{ + + class GroupHeader + { + public: + using SPtr = boost::shared_ptr; + GroupHeader() = default; + virtual ~GroupHeader() = default; + bool opcUaBinaryEncode(std::ostream& os) const; + bool opcUaBinaryDecode(std::istream& is); + + OpcUaByte& groupFlags(); + void groupFlags(OpcUaByte& groupFlags); + + OpcUaUInt16& writerGroupId(); + void writerGroupId(OpcUaUInt16& writerGroupId); + + OpcUaByte& groupVersion(); + void groupVersion(OpcUaByte& groupVersion); + + OpcUaUInt16& networkMessageNumber(); + void networkMessageNumber(OpcUaUInt16& networkMessageNumber); + + OpcUaUInt16& sequenceNumber(); + void sequenceNumber(OpcUaUInt16& sequenceNumber); + + private: + OpcUaByte groupFlags_; + OpcUaUInt16 writerGroupId_; + OpcUaByte groupVersion_; + OpcUaUInt16 networkMessageNumber_; + OpcUaUInt16 sequenceNumber_; + }; + +} + +#endif \ No newline at end of file diff --git a/src/OpcUaStackPubSub/NetworkMessage/NetworkMessage.cpp b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessage.cpp new file mode 100644 index 000000000..e93101ec5 --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessage.cpp @@ -0,0 +1,71 @@ +#include "OpcUaStackPubSub/NetworkMessage/NetworkMessage.h" + +namespace OpcUaStackPubSub +{ + + NetworkMessage::NetworkMessage() + : Object() + , networkMessageHeader_(boost::make_shared()) + , groupHeader_(boost::make_shared()) + { + + } + + NetworkMessage::~NetworkMessage() + { + + } + + void NetworkMessage::addPayLoadItem(uint32_t& dataSetMessage) + { + payLoad_.push_back(dataSetMessage); + } + + + NetworkMessageHeader::SPtr NetworkMessage::networkMessageHeader() + { + return networkMessageHeader_; + } + + void NetworkMessage::networkMessageHeader(const NetworkMessageHeader::SPtr networkMessageHeader) + { + networkMessageHeader_ = networkMessageHeader; + } + + GroupHeader::SPtr NetworkMessage::groupHeader() + { + return groupHeader_; + } + + void NetworkMessage::groupHeader(const GroupHeader::SPtr groupHeader) + { + groupHeader_ = groupHeader; + } + + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + // + // NetworkMessage Encoding and Decoding + // + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + bool + NetworkMessage::opcUaBinaryEncode(std::ostream& os) const + { + bool rc = true; + rc &= networkMessageHeader_->opcUaBinaryEncode(os); + rc &= groupHeader_->opcUaBinaryEncode(os); + rc &= payLoad_.opcUaBinaryEncode(os); + return rc; + } + + bool + NetworkMessage::opcUaBinaryDecode(std::istream& is) + { + bool rc = true; + rc &= networkMessageHeader_->opcUaBinaryDecode(is); + rc &= groupHeader_->opcUaBinaryDecode(is);; + rc &= payLoad_.opcUaBinaryDecode(is); + return rc; + } +} \ No newline at end of file diff --git a/src/OpcUaStackPubSub/NetworkMessage/NetworkMessage.h b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessage.h new file mode 100644 index 000000000..09e672348 --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessage.h @@ -0,0 +1,48 @@ +#ifndef __OpcUaStackPubSub_NetworkMessage_h__ +#define __OpcUaStackPubSub_NetworkMessage_h__ + +#include "OpcUaStackCore/Base/os.h" +#include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" +#include "OpcUaStackPubSub/DataSetWriter/DataSetWriterBase.h" +#include "OpcUaStackPubSub/Events/NetworkSendEvent.h" +#include "OpcUaStackPubSub/Events/NetworkRecvEvent.h" +#include "OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.h" +#include "OpcUaStackPubSub/NetworkMessage/GroupHeader.h" +#include "OpcUaStackCore/BuildInTypes/OpcUaGuid.h" +#include "OpcUaStackCore/Base/Utility.h" +#include + +using namespace OpcUaStackCore; +class NetworkMessageHeader; +class GroupHeader; + +namespace OpcUaStackPubSub +{ + + class DLLEXPORT NetworkMessage + : public OpcUaStackCore::Object + , public OpcUaStackServer::ServerServiceBase + { + public: + using SPtr = boost::shared_ptr; + + NetworkMessage(); + virtual ~NetworkMessage(); + + void addPayLoadItem(uint32_t& dataSetMessage); + NetworkMessageHeader::SPtr networkMessageHeader(); + void networkMessageHeader(const NetworkMessageHeader::SPtr networkMessageHeader); + GroupHeader::SPtr groupHeader(); + void groupHeader(const GroupHeader::SPtr groupHeader); + bool opcUaBinaryEncode(std::ostream& os) const; + bool opcUaBinaryDecode(std::istream& is); + + private: + NetworkMessageHeader::SPtr networkMessageHeader_; + GroupHeader::SPtr groupHeader_; + OpcUaInt32Array payLoad_; + //std::vector payLoad_; //actual payload datatype is DataSetMessage + }; +} + +#endif \ No newline at end of file diff --git a/src/OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.cpp b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.cpp new file mode 100644 index 000000000..f7a23ad08 --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.cpp @@ -0,0 +1,93 @@ +#include "OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.h" + +namespace OpcUaStackPubSub +{ + + OpcUaByte& NetworkMessageHeader::uadpVersionAndFlags() + { + //OpcUaByte uadpVersionAndFlags; + //uadpVersionAndFlags &= uadpVersion_; + //uadpVersionAndFlags = (uadpVersionAndFlags<<4) & uadpFlags_; + return uadpVersionAndFlags_; + } + + void NetworkMessageHeader::uadpVersionAndFlags(const OpcUaByte& uadpVersionAndFlags) + { + //uadpVersion_ = uadpVersionAndFlags & 0x0F; + //uadpFlags_ = uadpVersionAndFlags>>4 & 0x0F; + uadpVersionAndFlags_ = uadpVersionAndFlags; + } + + OpcUaByte& NetworkMessageHeader::extendedFlags1() + { + return extendedFlags1_; + } + void NetworkMessageHeader::extendedFlags1(const OpcUaByte& extendedFlags1) + { + extendedFlags1_ = extendedFlags1; + } + + OpcUaByte& NetworkMessageHeader::extendedFlags2() + { + return extendedFlags2_; + } + void NetworkMessageHeader::extendedFlags2(const OpcUaByte& extendedFlags2) + { + extendedFlags2_ = extendedFlags2; + } + + OpcUaUInt32& NetworkMessageHeader::publisherId() + { + return publisherId_; + } + void NetworkMessageHeader::publisherId(const OpcUaUInt32& publisherId) + { + publisherId_ = publisherId; + } + + OpcUaGuid& NetworkMessageHeader::dataSetClassId() + { + return dataSetClassId_; + } + + void NetworkMessageHeader::dataSetClassId(const OpcUaGuid& dataSetClassId) + { + dataSetClassId_ = dataSetClassId; + } + + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + // + // NetworkMessageHeader Encoding and Decoding + // + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + bool + NetworkMessageHeader::opcUaBinaryEncode(std::ostream& os) const + { + bool rc = true; + rc &= OpcUaNumber::opcUaBinaryEncode(os, uadpVersionAndFlags_); + //rc &= OpcUaNumber::opcUaBinaryEncode(os, uadpFlags_); + rc &= OpcUaNumber::opcUaBinaryEncode(os, extendedFlags1_); + rc &= OpcUaNumber::opcUaBinaryEncode(os, extendedFlags2_); + rc &= OpcUaNumber::opcUaBinaryEncode(os, publisherId_); + rc &= dataSetClassId_.opcUaBinaryEncode(os); + return rc; + } + + bool + NetworkMessageHeader::opcUaBinaryDecode(std::istream& is) + { + bool rc = true; + //OpcUaByte versionFlags; + rc &= OpcUaNumber::opcUaBinaryDecode(is, uadpVersionAndFlags_); + rc &= OpcUaNumber::opcUaBinaryDecode(is, extendedFlags1_); + rc &= OpcUaNumber::opcUaBinaryDecode(is, extendedFlags2_); + rc &= OpcUaNumber::opcUaBinaryDecode(is, publisherId_); + //uadpVersion_ = versionFlags & 0x0F; + //uadpFlags_ = versionFlags>>4 & 0x0F; + return rc; + } + + +} \ No newline at end of file diff --git a/src/OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.h b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.h new file mode 100644 index 000000000..dfd695b16 --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.h @@ -0,0 +1,45 @@ +#ifndef __OpcUaStackPubSub_NetworkMessageHeader_h__ +#define __OpcUaStackPubSub_NetworkMessageHeader_h__ + +#include "OpcUaStackCore/BuildInTypes/OpcUaGuid.h" + +using namespace OpcUaStackCore; + +namespace OpcUaStackPubSub +{ + class NetworkMessageHeader + { + public: + using SPtr = boost::shared_ptr; + NetworkMessageHeader() = default; + virtual ~NetworkMessageHeader() = default; + bool opcUaBinaryEncode(std::ostream& os) const; + bool opcUaBinaryDecode(std::istream& is); + + OpcUaByte& uadpVersionAndFlags(); + void uadpVersionAndFlags(const OpcUaByte& uadpVersionAndFlags); + + OpcUaByte& extendedFlags1(); + void extendedFlags1(const OpcUaByte& extendedFlags1); + + OpcUaByte& extendedFlags2(); + void extendedFlags2(const OpcUaByte& extendedFlags2); + + OpcUaUInt32& publisherId(); + void publisherId(const OpcUaUInt32& publisherId); + + OpcUaGuid& dataSetClassId(); + void dataSetClassId(const OpcUaGuid& dataSetClassId); + + + private: + OpcUaByte uadpVersionAndFlags_ = 1; + //OpcUaByte uadpFlags_ :4; + OpcUaByte extendedFlags1_ = 0; + OpcUaByte extendedFlags2_ = 0; + OpcUaUInt32 publisherId_ = 0; + OpcUaGuid dataSetClassId_; + }; +} + +#endif \ No newline at end of file diff --git a/src/OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.cpp b/src/OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.cpp new file mode 100644 index 000000000..a4446b93b --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.cpp @@ -0,0 +1,221 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de) + */ + +#include "OpcUaStackCore/Base/Log.h" +#include "OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.h" +#include "OpcUaStackPubSub/NetworkMessage/NetworkMessage.h" + +using namespace OpcUaStackCore; + +namespace OpcUaStackPubSub +{ + + NetworkMessageProcessor::NetworkMessageProcessor( + const std::string& messageProcessorName, + const std::string& messageTransportName, + OpcUaStackCore::IOThread::SPtr& ioThread, + OpcUaStackCore::MessageBus::SPtr& messageBus + ) + : ServerServiceBase() + { + Log(Info, "Constructor") + .parameter("ServiceName", serviceName_); + + messageTransportName_ = messageTransportName; + + // set parameter in server service base + serviceName_ = messageProcessorName; + ServerServiceBase::ioThread_ = ioThread.get(); + strand_ = ioThread->createStrand(); + messageBus_ = messageBus; + + // register message bus receiver + MessageBusMemberConfig messageBusMemberConfig; + messageBusMemberConfig.strand(strand_); + messageBusMember_ = messageBus_->registerMember(serviceName_, messageBusMemberConfig); + + // activate receiver + activateReceiver( + [this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { + // receive message from internal message bus + auto event = boost::static_pointer_cast(message); + switch (event->eventType()) + { + case EventType::NetworkRecvEvent: + { + break; + } + case EventType::NetworkSendEvent: + { + break; + } + }; + } + ); + } + + NetworkMessageProcessor::~NetworkMessageProcessor(void) + { + // deactivate receiver + deactivateReceiver(); + messageBus_->deregisterMember(messageBusMember_); + } + + void + NetworkMessageProcessor::readerGroupId(uint16_t readerGroupId) + { + readerGroupId_ = readerGroupId; + } + + bool + NetworkMessageProcessor::registerDataSetReader(uint16_t dataSetReaderId, DataSetReaderBase::SPtr& dataSetReader) + { + // check if data set reader already exist + auto it = dataSetReaderMap_.find(dataSetReaderId); + if (it != dataSetReaderMap_.end()) { + Log(Error, "register data set writer error, because data set writer id already exist") + .parameter("ReaderGroupName", serviceName_) + .parameter("DataSetReaderId", dataSetReaderId); + return false; + } + // add data set writer to map + dataSetReaderMap_.insert(std::make_pair(dataSetReaderId, dataSetReader)); + return true; + } + + bool + NetworkMessageProcessor::registerDataSetReaderSync(uint16_t dataSetReaderId, DataSetReaderBase::SPtr& dataSetReader) + { + // check if the call is made within the strand + if (!strand_->running_in_this_thread()) { + std::promise promise; + std::future future = promise.get_future(); + strand_->dispatch( + [this, &promise, dataSetReaderId, dataSetReader](void) mutable { + bool rc = registerDataSetReader(dataSetReaderId, dataSetReader); + promise.set_value(rc); + } + ); + future.wait(); + return future.get(); + } + + return registerDataSetReader(dataSetReaderId, dataSetReader); + } + + bool + NetworkMessageProcessor::deregisterDataSetReader(uint16_t dataSetReaderId) + { + // check if data set writer exist + auto it = dataSetReaderMap_.find(dataSetReaderId); + if (it == dataSetReaderMap_.end()) { + Log(Error, "deregister data set writer error, because data set writer id not exist") + .parameter("ReaderGroupName", serviceName_) + .parameter("DataSetReaderId", dataSetReaderId); + return false; + } + + dataSetReaderMap_.erase(it); + return true; + } + + bool + NetworkMessageProcessor::startupSync(void) + { + // check if the call is made within the strand + if (!strand_->running_in_this_thread()) { + std::promise promise; + std::future future = promise.get_future(); + strand_->dispatch( + [this, &promise](void) mutable { + bool rc = startup(); + promise.set_value(rc); + } + ); + future.wait(); + return future.get(); + } + + return startup(); + } + + bool + NetworkMessageProcessor::shutdownSync(void) + { + // check if the call is made within the strand + if (!strand_->running_in_this_thread()) { + std::promise promise; + std::future future = promise.get_future(); + strand_->dispatch( + [this, &promise](void) mutable { + bool rc = shutdown(); + promise.set_value(rc); + } + ); + future.wait(); + return future.get(); + } + + return shutdown(); + } + bool + NetworkMessageProcessor::deregisterDataSetReaderSync(uint16_t dataSetReaderId) + { + // check if the call is made within the strand + if (!strand_->running_in_this_thread()) { + std::promise promise; + std::future future = promise.get_future(); + strand_->dispatch( + [this, &promise, dataSetReaderId](void) mutable { + bool rc = deregisterDataSetReader(dataSetReaderId); + promise.set_value(rc); + } + ); + future.wait(); + return future.get(); + } + + return deregisterDataSetReaderSync(dataSetReaderId); + } + + bool + NetworkMessageProcessor::startup(void) + { + // check if the call is made within the strand + // FIXME: todo + Log(Info, "Startup").parameter("ServiceName", serviceName_); + + // get reference to message transport bus member + if (!messageBus_->existMember(messageTransportName_)) { + Log(Error, "message transport message bus member don't exist") + .parameter("MessageTransportName", messageTransportName_); + return false; + } + messageTransportBusMember_ = messageBus_->getMember(messageTransportName_); + + return true; + } + + bool + NetworkMessageProcessor::shutdown(void) + { + // check if the call is made within the strand + // FIXME: todo + + return true; + } +} diff --git a/src/OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.h b/src/OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.h new file mode 100644 index 000000000..28b3a4ed5 --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.h @@ -0,0 +1,70 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de) + */ + +#ifndef __OpcUaStackPubSub_NetworkMessageProcessor_h__ +#define __OpcUaStackPubSub_NetworkMessageProcessor_h__ + +#include + +#include "OpcUaStackCore/Base/os.h" +#include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" +#include "OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h" +#include "OpcUaStackPubSub/Events/NetworkSendEvent.h" +#include "OpcUaStackPubSub/Events/NetworkRecvEvent.h" + +namespace OpcUaStackPubSub +{ + + class DLLEXPORT NetworkMessageProcessor + : public OpcUaStackCore::Object + , public OpcUaStackServer::ServerServiceBase + { + public: + using SPtr = boost::shared_ptr; + + NetworkMessageProcessor( + const std::string& messageProcessorName, + const std::string& messageTransportName, + OpcUaStackCore::IOThread::SPtr& ioThread, + OpcUaStackCore::MessageBus::SPtr& messageBus + ); + virtual ~NetworkMessageProcessor(void); + + void readerGroupId(uint16_t readerGroupId); + bool registerDataSetReaderSync(uint16_t dataSetReaderId, DataSetReaderBase::SPtr& dataSetReader); + bool deregisterDataSetReaderSync(uint16_t dataSetReaderId); + + bool startupSync(void); + bool shutdownSync(void); + + private: + uint16_t readerGroupId_ = 0; + std::string messageTransportName_ = ""; + OpcUaStackCore::MessageBusMember::WPtr messageTransportBusMember_; + + DataSetReaderBase::Map dataSetReaderMap_; + + bool startup(); + bool shutdown(); + + bool registerDataSetReader(uint16_t dataSetReaderId, DataSetReaderBase::SPtr& dataSetReader); + bool deregisterDataSetReader(uint16_t dataSetReaderId); + }; + +} + +#endif diff --git a/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.cpp b/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.cpp index a776230a8..a96379166 100644 --- a/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.cpp +++ b/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.cpp @@ -17,6 +17,7 @@ #include "OpcUaStackCore/Base/Log.h" #include "OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.h" +#include "OpcUaStackPubSub/NetworkMessage/NetworkMessage.h" using namespace OpcUaStackCore; @@ -31,6 +32,9 @@ namespace OpcUaStackPubSub ) : ServerServiceBase() { + Log(Info, "Constructor") + .parameter("ServiceName", serviceName_); + messageTransportName_ = messageTransportName; // set parameter in server service base @@ -78,7 +82,6 @@ namespace OpcUaStackPubSub .parameter("DataSetWriterId", dataSetWriterId); return false; } - // add data set writer to map dataSetWriterMap_.insert(std::make_pair(dataSetWriterId, dataSetWriter)); return true; @@ -139,12 +142,11 @@ namespace OpcUaStackPubSub return deregisterDataSetWriterSync(dataSetWriterId); } - + bool - NetworkMessageWriterGroup::startupSync(void) + NetworkMessageWriterGroup::startup(void) { - // check if the call is made within the strand - // FIXME: todo + Log(Info, "Startup").parameter("ServiceName", serviceName_); // get reference to message transport bus member if (!messageBus_->existMember(messageTransportName_)) { @@ -152,26 +154,63 @@ namespace OpcUaStackPubSub .parameter("MessageTransportName", messageTransportName_); return false; } + messageTransportBusMember_ = messageBus_->getMember(messageTransportName_); // start publish loop - // FIXME: todo + while(!leavePublishLoop_) { + publishLoop(); + boost::this_thread::sleep(boost::posix_time::seconds(1)); + } + return true; + } - // FIXME: todo + bool + NetworkMessageWriterGroup::shutdown(void) + { + // stop publish loop + leavePublishLoop_ = true; return true; } bool - NetworkMessageWriterGroup::shutdownSync(void) + NetworkMessageWriterGroup::startupSync(void) { // check if the call is made within the strand - // FIXME: todo + if (!strand_->running_in_this_thread()) { + std::promise promise; + std::future future = promise.get_future(); + strand_->dispatch( + [this, &promise](void) mutable { + bool rc = startup(); + promise.set_value(rc); + } + ); + future.wait(); + return future.get(); + } - // stop publish loop - // FIXME: todo + return startup(); + } - // FIXME: todo - return true; + bool + NetworkMessageWriterGroup::shutdownSync(void) + { + // check if the call is made within the strand + if (!strand_->running_in_this_thread()) { + std::promise promise; + std::future future = promise.get_future(); + strand_->dispatch( + [this, &promise](void) mutable { + bool rc = shutdown(); + promise.set_value(rc); + } + ); + future.wait(); + return future.get(); + } + + return shutdown(); } void @@ -198,8 +237,26 @@ namespace OpcUaStackPubSub // create network message // FIXME: todo + auto networkMessage = boost::make_shared(); + auto networkMessageHeader = boost::make_shared(); + + OpcUaByte uadpVersionAndFlags; + uadpVersionAndFlags = 0b0001; //Flags value --> only publisherId(bit 4) is enabled + uadpVersionAndFlags = uadpVersionAndFlags<<4 & 1; //version: default 1; + networkMessageHeader->uadpVersionAndFlags(uadpVersionAndFlags); + + OpcUaInt32 publisherId = 25; //temporary data + networkMessageHeader->publisherId(publisherId); + + networkMessage->networkMessageHeader(networkMessageHeader); + uint32_t dataSetMessage = 2525; //temporary data + networkMessage->addPayLoadItem(dataSetMessage); // send network message to message transport module + auto event = boost::make_shared(); + std::ostream os(&event->streamBuf()); + networkMessage->opcUaBinaryEncode(os); + messageBus_->messageSend(messageBusMember_, messageTransportBusMember_, event); } } diff --git a/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.h b/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.h index 36e5a40d6..90b0f271d 100644 --- a/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.h +++ b/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.h @@ -23,6 +23,8 @@ #include "OpcUaStackCore/Base/os.h" #include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" #include "OpcUaStackPubSub/DataSetWriter/DataSetWriterBase.h" +#include "OpcUaStackPubSub/Events/NetworkSendEvent.h" +#include "OpcUaStackPubSub/Events/NetworkRecvEvent.h" namespace OpcUaStackPubSub { @@ -32,7 +34,7 @@ namespace OpcUaStackPubSub , public OpcUaStackServer::ServerServiceBase { public: - typedef boost::shared_ptr SPtr; + using SPtr = boost::shared_ptr; NetworkMessageWriterGroup( const std::string& writerGroupName, @@ -49,18 +51,25 @@ namespace OpcUaStackPubSub bool startupSync(void); bool shutdownSync(void); - public: + private: uint16_t writerGroupId_ = 0; std::string messageTransportName_ = ""; OpcUaStackCore::MessageBusMember::WPtr messageTransportBusMember_; + bool leavePublishLoop_ = false; DataSetWriterBase::Map dataSetWriterMap_; + bool startup(); + bool shutdown(); bool registerDataSetWriter(uint16_t dataSetWriterId, DataSetWriterBase::SPtr& dataSetWriter); bool deregisterDataSetWriter(uint16_t dataSetWriterId); void publishLoop(void); + }; + + + } #endif diff --git a/tst/OpcUaStackPubSub/Connection/MessageTransport_t.cpp b/tst/OpcUaStackPubSub/Connection/MessageTransport_t.cpp new file mode 100644 index 000000000..d1cdcb36b --- /dev/null +++ b/tst/OpcUaStackPubSub/Connection/MessageTransport_t.cpp @@ -0,0 +1,123 @@ +#include "unittest.h" + +#include + +#include "OpcUaStackPubSub/Connection/UDPConnection.h" +#include "OpcUaStackPubSub/MessageTransport/MessageTransport.h" +#include "OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.h" +#include "OpcUaStackPubSub/NetworkMessage/NetworkMessage.h" +//#include "OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.h" + +using namespace OpcUaStackCore; +using namespace OpcUaStackPubSub; + +class DLLEXPORT TestUDPConnection +{ + public: + + TestUDPConnection(std::promise* promise) { + promise_ = promise; + } + + ~TestUDPConnection(void) { + } + + void + receive(MessageBusError error, const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { + event_ = boost::static_pointer_cast(message); + promise_->set_value(); + } + + std::promise* promise_ = nullptr; + NetworkRecvEvent::SPtr event_; +}; + +BOOST_AUTO_TEST_SUITE(MessageTransport_) + +BOOST_AUTO_TEST_CASE(MessageTransport_) +{ + std::cout << "MessageTransport_t" << std::endl; +} + + +BOOST_AUTO_TEST_CASE(MessageTransport_send_receive_message) +{ + // start thread pool + auto ioThread = boost::make_shared("TestThread1"); + ioThread->numberThreads(6); + BOOST_REQUIRE(ioThread->startup() == true); + + // start message bus + MessageBusConfig messageBusConfig; + messageBusConfig.ioThread(ioThread); + MessageBus::SPtr messageBus = boost::make_shared(messageBusConfig); + + // register sender and receiver + auto udpConnectionMember = messageBus->registerMember("UDPConnection"); + auto messageTransportMember = messageBus->registerMember("MessageTransport"); + auto receiver = messageBus->registerMember("receiver"); + + // create udp connection + auto udpConnection = boost::make_shared( + "127.0.0.1", // receiver address + 5678, // receiver port + "127.0.0.1", // target address + 5679, // target port + "MessageTransport", // message bus name of the message transport module + "UDPConnection", // message bus name of the udp connection module + ioThread, // thread + messageBus // message bus + ); + udpConnection->startup(); + + auto messageTransport = boost::make_shared( + "receiver", + "MessageTransport", + ioThread, + messageBus + ); + messageTransport->startup(); + + // receive + std::promise promise; + auto future = promise.get_future(); + TestUDPConnection testCon(&promise); + + messageBus->messageReceive( + receiver, + [&testCon](MessageBusError error, const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) mutable { + testCon.receive(error, handleFrom, message); + } + ); + + // send network message to message transport module + auto event = boost::make_shared(); + std::ostream os(&event->streamBuf()); + os << "Dies ist eine Message"; + + messageBus->messageSend(udpConnectionMember, messageTransportMember, event); + + // wait for receiver to finish + BOOST_REQUIRE(future.wait_for(std::chrono::seconds(1)) == std::future_status::ready); + + // check received message + boost::asio::streambuf::const_buffers_type bufs = testCon.event_->streamBuf().data(); + std::string str( + boost::asio::buffers_begin(bufs), + boost::asio::buffers_begin(bufs) + testCon.event_->streamBuf().size() + ); + + BOOST_REQUIRE(str == std::string("Dies ist eine Message")); + + messageTransport->shutdown(); + messageTransport.reset(); + + // delete udp connection + udpConnection->shutdown(); + udpConnection.reset(); + + // stop thread pool + BOOST_REQUIRE(ioThread->shutdown() == true); +} + +BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file