From f1bf69ade51432f064dff3aab192a56b405d49b6 Mon Sep 17 00:00:00 2001 From: Kai Huebl Date: Fri, 18 Mar 2022 15:07:58 +0100 Subject: [PATCH 01/11] add class MsgTransProvider --- .../MessageTransport/MsgTransProvider.cpp | 77 +++++++++++++++++++ .../MessageTransport/MsgTransProvider.h | 50 ++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.cpp create mode 100644 src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.h diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.cpp new file mode 100644 index 000000000..f70fbc94e --- /dev/null +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.cpp @@ -0,0 +1,77 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de) + */ +#include "OpcUaStackCore/Base/Log.h" +#include "OpcUaStackPubSub/MessageTransport/MsgTransProvider.h" + +using namespace OpcUaStackCore; + +namespace OpcUaStackPubSub +{ + + MsgTransProvider::MsgTransProvider( + const std::string& serviceName, + OpcUaStackCore::IOThread::SPtr& ioThread, + OpcUaStackCore::MessageBus::SPtr& messageBus + ) + : ServerServiceBase() + { + // set parameter + ioThread_ = ioThread; + + // set parameter in server service base + serviceName_ = serviceName; + ServerServiceBase::ioThread_ = ioThread.get(); + strand_ = ioThread->createStrand(); + messageBus_ = messageBus; + + // register message bus receiver + MessageBusMemberConfig messageBusMemberConfig; + messageBusMemberConfig.strand(strand_); + messageBusMember_ = messageBus_->registerMember(serviceName_, messageBusMemberConfig); + + // activate receiver + activateReceiver( + [this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { + // receive message from internal message bus + + // FIXME: todo + } + ); + } + + MsgTransProvider::~MsgTransProvider(void) + { + // deactivate receiver + deactivateReceiver(); + messageBus_->deregisterMember(messageBusMember_); + } + + bool + MsgTransProvider::startup(void) + { + // FIXME: todo + return true; + } + + bool + MsgTransProvider::shutdown(void) + { + // FIXME: todo + return true; + } + +} diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.h b/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.h new file mode 100644 index 000000000..731c2523d --- /dev/null +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.h @@ -0,0 +1,50 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de) + + */ + +#ifndef __OpcUaStackPubSub_MsgTransProvider_h__ +#define __OpcUaStackPubSub_MsgTransProvider_h__ + +#include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" + +namespace OpcUaStackPubSub +{ + + class DLLEXPORT MsgTransProvider + : public OpcUaStackCore::Object + , public OpcUaStackServer::ServerServiceBase + { + public: + using SPtr = boost::shared_ptr; + + MsgTransProvider( + const std::string& serviceName, + OpcUaStackCore::IOThread::SPtr& ioThread, + OpcUaStackCore::MessageBus::SPtr& messageBus + ); + ~MsgTransProvider(void); + + bool startup(void); + bool shutdown(void); + + private: + OpcUaStackCore::IOThread::SPtr ioThread_; // smart pointer to io thread + }; + +} + +#endif From 85adf23a866f437324891e5ad6ecfabbc6893547 Mon Sep 17 00:00:00 2001 From: Kai Huebl Date: Fri, 18 Mar 2022 15:10:42 +0100 Subject: [PATCH 02/11] add class MsgTransSubscriber --- .../MessageTransport/MsgTransSubscriber.cpp | 77 +++++++++++++++++++ .../MessageTransport/MsgTransSubscriber.h | 50 ++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp create mode 100644 src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp new file mode 100644 index 000000000..293d84843 --- /dev/null +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp @@ -0,0 +1,77 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de) + */ +#include "OpcUaStackCore/Base/Log.h" +#include "OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h" + +using namespace OpcUaStackCore; + +namespace OpcUaStackPubSub +{ + + MsgTransSubscriber::MsgTransSubscriber( + const std::string& serviceName, + OpcUaStackCore::IOThread::SPtr& ioThread, + OpcUaStackCore::MessageBus::SPtr& messageBus + ) + : ServerServiceBase() + { + // set parameter + ioThread_ = ioThread; + + // set parameter in server service base + serviceName_ = serviceName; + ServerServiceBase::ioThread_ = ioThread.get(); + strand_ = ioThread->createStrand(); + messageBus_ = messageBus; + + // register message bus receiver + MessageBusMemberConfig messageBusMemberConfig; + messageBusMemberConfig.strand(strand_); + messageBusMember_ = messageBus_->registerMember(serviceName_, messageBusMemberConfig); + + // activate receiver + activateReceiver( + [this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { + // receive message from internal message bus + + // FIXME: todo + } + ); + } + + MsgTransSubscriber::~MsgTransSubscriber(void) + { + // deactivate receiver + deactivateReceiver(); + messageBus_->deregisterMember(messageBusMember_); + } + + bool + MsgTransSubscriber::startup(void) + { + // FIXME: todo + return true; + } + + bool + MsgTransSubscriber::shutdown(void) + { + // FIXME: todo + return true; + } + +} diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h new file mode 100644 index 000000000..cca918cee --- /dev/null +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h @@ -0,0 +1,50 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de) + + */ + +#ifndef __OpcUaStackPubSub_MsgTransSubscriber_h__ +#define __OpcUaStackPubSub_MsgTransSubscriber_h__ + +#include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" + +namespace OpcUaStackPubSub +{ + + class DLLEXPORT MsgTransSubscriber + : public OpcUaStackCore::Object + , public OpcUaStackServer::ServerServiceBase + { + public: + using SPtr = boost::shared_ptr; + + MsgTransSubscriber( + const std::string& serviceName, + OpcUaStackCore::IOThread::SPtr& ioThread, + OpcUaStackCore::MessageBus::SPtr& messageBus + ); + ~MsgTransSubscriber(void); + + bool startup(void); + bool shutdown(void); + + private: + OpcUaStackCore::IOThread::SPtr ioThread_; // smart pointer to io thread + }; + +} + +#endif From 56434b1a6a160befc4c6e270d450d27bff50ee49 Mon Sep 17 00:00:00 2001 From: Kai Huebl Date: Fri, 18 Mar 2022 15:36:25 +0100 Subject: [PATCH 03/11] add some interface functions --- ...ransProvider.cpp => MsgTransPublisher.cpp} | 13 +++++++------ ...MsgTransProvider.h => MsgTransPublisher.h} | 19 +++++++++++++------ .../MessageTransport/MsgTransSubscriber.cpp | 19 +++++++++++++++++++ .../MessageTransport/MsgTransSubscriber.h | 16 ++++++++++++++++ 4 files changed, 55 insertions(+), 12 deletions(-) rename src/OpcUaStackPubSub/MessageTransport/{MsgTransProvider.cpp => MsgTransPublisher.cpp} (87%) rename src/OpcUaStackPubSub/MessageTransport/{MsgTransProvider.h => MsgTransPublisher.h} (67%) diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp similarity index 87% rename from src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.cpp rename to src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp index f70fbc94e..8a3d43f90 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.cpp +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp @@ -14,15 +14,16 @@ Autor: Kai Huebl (kai@huebl-sgh.de) */ -#include "OpcUaStackCore/Base/Log.h" -#include "OpcUaStackPubSub/MessageTransport/MsgTransProvider.h" + +#include "OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h" using namespace OpcUaStackCore; namespace OpcUaStackPubSub { - MsgTransProvider::MsgTransProvider( + MsgTransPublisher::MsgTransPublisher( + const std::string& connectionName, const std::string& serviceName, OpcUaStackCore::IOThread::SPtr& ioThread, OpcUaStackCore::MessageBus::SPtr& messageBus @@ -53,7 +54,7 @@ namespace OpcUaStackPubSub ); } - MsgTransProvider::~MsgTransProvider(void) + MsgTransPublisher::~MsgTransPublisher(void) { // deactivate receiver deactivateReceiver(); @@ -61,14 +62,14 @@ namespace OpcUaStackPubSub } bool - MsgTransProvider::startup(void) + MsgTransPublisher::startup(void) { // FIXME: todo return true; } bool - MsgTransProvider::shutdown(void) + MsgTransPublisher::shutdown(void) { // FIXME: todo return true; diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.h b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h similarity index 67% rename from src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.h rename to src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h index 731c2523d..087cd16c7 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.h +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h @@ -14,29 +14,36 @@ Autor: Kai Huebl (kai@huebl-sgh.de) + DESCRIPTION: + + The message transport publisher module receives a message from one ore more + network message writer groups via the internal message bus. The received message + is forwarded to the assigned communication module via the internal message bus. + */ -#ifndef __OpcUaStackPubSub_MsgTransProvider_h__ -#define __OpcUaStackPubSub_MsgTransProvider_h__ +#ifndef __OpcUaStackPubSub_MsgTransPublisher_h__ +#define __OpcUaStackPubSub_MsgTransPublisher_h__ #include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" namespace OpcUaStackPubSub { - class DLLEXPORT MsgTransProvider + class DLLEXPORT MsgTransPublisher : public OpcUaStackCore::Object , public OpcUaStackServer::ServerServiceBase { public: - using SPtr = boost::shared_ptr; + using SPtr = boost::shared_ptr; - MsgTransProvider( + MsgTransPublisher( + const std::string& connectionName, // message bus member name const std::string& serviceName, OpcUaStackCore::IOThread::SPtr& ioThread, OpcUaStackCore::MessageBus::SPtr& messageBus ); - ~MsgTransProvider(void); + ~MsgTransPublisher(void); bool startup(void); bool shutdown(void); diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp index 293d84843..767bf61e3 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp @@ -74,4 +74,23 @@ namespace OpcUaStackPubSub return true; } + bool + MsgTransSubscriber::registerNetworkMessageProcessor( + uint32_t publisherId, // publisher id + const std::string& networkMessageProcessorName // message bus member name + ) + { + // FIXME: todo + return true; + } + + bool + MsgTransSubscriber::deregisterNetworkMessageProcessor( + uint32_t publisherId + ) + { + // FIXME: todo + return true; + } + } diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h index cca918cee..43de15af5 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h @@ -14,6 +14,14 @@ Autor: Kai Huebl (kai@huebl-sgh.de) + DESCRIPTION: + + The message transport subscriber module receives a message from the communication + module via the internal message bus.The publisher ID is determined from the received + message and the assigned network message processor is determined. The received message + is forwarded to the assigned network message processor module via the internal message + bus. + */ #ifndef __OpcUaStackPubSub_MsgTransSubscriber_h__ @@ -41,6 +49,14 @@ namespace OpcUaStackPubSub bool startup(void); bool shutdown(void); + bool registerNetworkMessageProcessor( + uint32_t publisherId, // publisher id + const std::string& networkMessageProcessorName // message bus member name + ); + bool deregisterNetworkMessageProcessor( + uint32_t publisherId + ); + private: OpcUaStackCore::IOThread::SPtr ioThread_; // smart pointer to io thread }; From 9f13b3b7a89ad661d51af4df02c42e885a9e6db7 Mon Sep 17 00:00:00 2001 From: Kai Huebl Date: Fri, 18 Mar 2022 15:47:02 +0100 Subject: [PATCH 04/11] add receiver function --- .../MessageTransport/MsgTransPublisher.cpp | 19 ++++++++++++++++++- .../MessageTransport/MsgTransSubscriber.cpp | 18 +++++++++++++++++- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp index 8a3d43f90..89a89950d 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp @@ -15,6 +15,8 @@ Autor: Kai Huebl (kai@huebl-sgh.de) */ +#include "OpcUaStackCore/Base/Log.h" +#include "OpcUaStackPubSub/Events/NetworkSendEvent.h" #include "OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h" using namespace OpcUaStackCore; @@ -49,7 +51,22 @@ namespace OpcUaStackPubSub [this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { // receive message from internal message bus - // FIXME: todo + auto event = boost::static_pointer_cast(message); + switch (event->eventType()) + { + case EventType::NetworkSendEvent: + { + NetworkSendEvent::SPtr event = boost::static_pointer_cast(message); + // FIXME: todo + break; + } + default: + { + Log(Error, "invalid message received in message transport module") + .parameter("ServiceName", serviceName_) + .parameter("Event", (uint32_t)event->eventType()); + } + } } ); } diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp index 767bf61e3..1224dd191 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp @@ -15,6 +15,7 @@ Autor: Kai Huebl (kai@huebl-sgh.de) */ #include "OpcUaStackCore/Base/Log.h" +#include "OpcUaStackPubSub/Events/NetworkRecvEvent.h" #include "OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h" using namespace OpcUaStackCore; @@ -48,7 +49,22 @@ namespace OpcUaStackPubSub [this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { // receive message from internal message bus - // FIXME: todo + auto event = boost::static_pointer_cast(message); + switch (event->eventType()) + { + case EventType::NetworkRecvEvent: + { + NetworkRecvEvent::SPtr event = boost::static_pointer_cast(message); + // FIXME: todo + break; + } + default: + { + Log(Error, "invalid message received in message transport module") + .parameter("ServiceName", serviceName_) + .parameter("Event", (uint32_t)event->eventType()); + } + } } ); } From 1ad233f6fc7208390435c8a6a618f9bb5d1eead9 Mon Sep 17 00:00:00 2001 From: Kai Huebl Date: Fri, 18 Mar 2022 16:00:36 +0100 Subject: [PATCH 05/11] rename class --- ...ansSubscriber.cpp => MessageTransport.cpp} | 24 +++-- ...sgTransSubscriber.h => MessageTransport.h} | 28 +++--- .../MessageTransport/MsgTransPublisher.cpp | 95 ------------------- .../MessageTransport/MsgTransPublisher.h | 57 ----------- 4 files changed, 34 insertions(+), 170 deletions(-) rename src/OpcUaStackPubSub/MessageTransport/{MsgTransSubscriber.cpp => MessageTransport.cpp} (80%) rename src/OpcUaStackPubSub/MessageTransport/{MsgTransSubscriber.h => MessageTransport.h} (59%) delete mode 100644 src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp delete mode 100644 src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp b/src/OpcUaStackPubSub/MessageTransport/MessageTransport.cpp similarity index 80% rename from src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp rename to src/OpcUaStackPubSub/MessageTransport/MessageTransport.cpp index 1224dd191..38c16561b 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp +++ b/src/OpcUaStackPubSub/MessageTransport/MessageTransport.cpp @@ -13,17 +13,21 @@ im Rahmen der Lizenz finden Sie in der Lizenz. Autor: Kai Huebl (kai@huebl-sgh.de) + */ + +#include "OpcUaStackPubSub/MessageTransport/MessageTransport.h" #include "OpcUaStackCore/Base/Log.h" +#include "OpcUaStackPubSub/Events/NetworkSendEvent.h" #include "OpcUaStackPubSub/Events/NetworkRecvEvent.h" -#include "OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h" using namespace OpcUaStackCore; namespace OpcUaStackPubSub { - MsgTransSubscriber::MsgTransSubscriber( + MessageTransport::MessageTransport( + const std::string& connectionName, // message bus member name const std::string& serviceName, OpcUaStackCore::IOThread::SPtr& ioThread, OpcUaStackCore::MessageBus::SPtr& messageBus @@ -58,6 +62,12 @@ namespace OpcUaStackPubSub // FIXME: todo break; } + case EventType::NetworkSendEvent: + { + NetworkSendEvent::SPtr event = boost::static_pointer_cast(message); + // FIXME: todo + break; + } default: { Log(Error, "invalid message received in message transport module") @@ -69,7 +79,7 @@ namespace OpcUaStackPubSub ); } - MsgTransSubscriber::~MsgTransSubscriber(void) + MessageTransport::~MessageTransport(void) { // deactivate receiver deactivateReceiver(); @@ -77,21 +87,21 @@ namespace OpcUaStackPubSub } bool - MsgTransSubscriber::startup(void) + MessageTransport::startup(void) { // FIXME: todo return true; } bool - MsgTransSubscriber::shutdown(void) + MessageTransport::shutdown(void) { // FIXME: todo return true; } bool - MsgTransSubscriber::registerNetworkMessageProcessor( + MessageTransport::registerNetworkMessageProcessor( uint32_t publisherId, // publisher id const std::string& networkMessageProcessorName // message bus member name ) @@ -101,7 +111,7 @@ namespace OpcUaStackPubSub } bool - MsgTransSubscriber::deregisterNetworkMessageProcessor( + MessageTransport::deregisterNetworkMessageProcessor( uint32_t publisherId ) { diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h b/src/OpcUaStackPubSub/MessageTransport/MessageTransport.h similarity index 59% rename from src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h rename to src/OpcUaStackPubSub/MessageTransport/MessageTransport.h index 43de15af5..864e35211 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h +++ b/src/OpcUaStackPubSub/MessageTransport/MessageTransport.h @@ -16,35 +16,41 @@ DESCRIPTION: - The message transport subscriber module receives a message from the communication - module via the internal message bus.The publisher ID is determined from the received - message and the assigned network message processor is determined. The received message - is forwarded to the assigned network message processor module via the internal message - bus. + 1. Subscriber Mode: + The message transport subscriber module receives a message from the communication + module via the internal message bus.The publisher ID is determined from the received + message and the assigned network message processor is determined. The received message + is forwarded to the assigned network message processor module via the internal message + bus. + 2. Publisher Mode + The message transport publisher module receives a message from one ore more + network message writer groups via the internal message bus. The received message + is forwarded to the assigned communication module via the internal message bus. */ -#ifndef __OpcUaStackPubSub_MsgTransSubscriber_h__ -#define __OpcUaStackPubSub_MsgTransSubscriber_h__ +#ifndef __OpcUaStackPubSub_MessageTransport_h__ +#define __OpcUaStackPubSub_MessageTransport_h__ #include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" namespace OpcUaStackPubSub { - class DLLEXPORT MsgTransSubscriber + class DLLEXPORT MessageTransport : public OpcUaStackCore::Object , public OpcUaStackServer::ServerServiceBase { public: - using SPtr = boost::shared_ptr; + using SPtr = boost::shared_ptr; - MsgTransSubscriber( + MessageTransport( + const std::string& connectionName, // message bus member name const std::string& serviceName, OpcUaStackCore::IOThread::SPtr& ioThread, OpcUaStackCore::MessageBus::SPtr& messageBus ); - ~MsgTransSubscriber(void); + ~MessageTransport(void); bool startup(void); bool shutdown(void); diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp deleted file mode 100644 index 89a89950d..000000000 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp +++ /dev/null @@ -1,95 +0,0 @@ -/* - Copyright 2022 Kai Huebl (kai@huebl-sgh.de) - - Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser - Datei nur in Übereinstimmung mit der Lizenz erlaubt. - Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. - - Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, - erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE - GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. - - Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen - im Rahmen der Lizenz finden Sie in der Lizenz. - - Autor: Kai Huebl (kai@huebl-sgh.de) - */ - -#include "OpcUaStackCore/Base/Log.h" -#include "OpcUaStackPubSub/Events/NetworkSendEvent.h" -#include "OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h" - -using namespace OpcUaStackCore; - -namespace OpcUaStackPubSub -{ - - MsgTransPublisher::MsgTransPublisher( - const std::string& connectionName, - const std::string& serviceName, - OpcUaStackCore::IOThread::SPtr& ioThread, - OpcUaStackCore::MessageBus::SPtr& messageBus - ) - : ServerServiceBase() - { - // set parameter - ioThread_ = ioThread; - - // set parameter in server service base - serviceName_ = serviceName; - ServerServiceBase::ioThread_ = ioThread.get(); - strand_ = ioThread->createStrand(); - messageBus_ = messageBus; - - // register message bus receiver - MessageBusMemberConfig messageBusMemberConfig; - messageBusMemberConfig.strand(strand_); - messageBusMember_ = messageBus_->registerMember(serviceName_, messageBusMemberConfig); - - // activate receiver - activateReceiver( - [this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { - // receive message from internal message bus - - auto event = boost::static_pointer_cast(message); - switch (event->eventType()) - { - case EventType::NetworkSendEvent: - { - NetworkSendEvent::SPtr event = boost::static_pointer_cast(message); - // FIXME: todo - break; - } - default: - { - Log(Error, "invalid message received in message transport module") - .parameter("ServiceName", serviceName_) - .parameter("Event", (uint32_t)event->eventType()); - } - } - } - ); - } - - MsgTransPublisher::~MsgTransPublisher(void) - { - // deactivate receiver - deactivateReceiver(); - messageBus_->deregisterMember(messageBusMember_); - } - - bool - MsgTransPublisher::startup(void) - { - // FIXME: todo - return true; - } - - bool - MsgTransPublisher::shutdown(void) - { - // FIXME: todo - return true; - } - -} diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h deleted file mode 100644 index 087cd16c7..000000000 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - Copyright 2022 Kai Huebl (kai@huebl-sgh.de) - - Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser - Datei nur in Übereinstimmung mit der Lizenz erlaubt. - Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. - - Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, - erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE - GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. - - Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen - im Rahmen der Lizenz finden Sie in der Lizenz. - - Autor: Kai Huebl (kai@huebl-sgh.de) - - DESCRIPTION: - - The message transport publisher module receives a message from one ore more - network message writer groups via the internal message bus. The received message - is forwarded to the assigned communication module via the internal message bus. - - */ - -#ifndef __OpcUaStackPubSub_MsgTransPublisher_h__ -#define __OpcUaStackPubSub_MsgTransPublisher_h__ - -#include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" - -namespace OpcUaStackPubSub -{ - - class DLLEXPORT MsgTransPublisher - : public OpcUaStackCore::Object - , public OpcUaStackServer::ServerServiceBase - { - public: - using SPtr = boost::shared_ptr; - - MsgTransPublisher( - const std::string& connectionName, // message bus member name - const std::string& serviceName, - OpcUaStackCore::IOThread::SPtr& ioThread, - OpcUaStackCore::MessageBus::SPtr& messageBus - ); - ~MsgTransPublisher(void); - - bool startup(void); - bool shutdown(void); - - private: - OpcUaStackCore::IOThread::SPtr ioThread_; // smart pointer to io thread - }; - -} - -#endif From 04c3ba7debb5a4b1cac403c7db3f964e169d2437 Mon Sep 17 00:00:00 2001 From: Upendar Reddy Sama Date: Sun, 29 May 2022 23:50:12 +0200 Subject: [PATCH 06/11] MessageTransport Network Send and Recv Events handled NetworkMessage class has been added Temporary code for publishloop has been added unit test for MessageTransport has been added NetworkMessageProcessor class has been added --- .../DataSetReader/DataSetReaderBase.cpp | 50 ++++ .../DataSetReader/DataSetReaderBase.h | 50 ++++ .../MessageTransport/MessageTransport.cpp | 56 ++++- .../MessageTransport/MessageTransport.h | 3 + .../NetworkMessage/GroupHeader.cpp | 84 +++++++ .../NetworkMessage/GroupHeader.h | 48 ++++ .../NetworkMessage/NetworkMessage.cpp | 71 ++++++ .../NetworkMessage/NetworkMessage.h | 48 ++++ .../NetworkMessage/NetworkMessageHeader.cpp | 93 ++++++++ .../NetworkMessage/NetworkMessageHeader.h | 45 ++++ .../NetworkMessageProcessor.cpp | 221 ++++++++++++++++++ .../NetworkMessageProcessor.h | 70 ++++++ .../NetworkMessageWriterGroup.cpp | 83 +++++-- .../NetworkMessageWriterGroup.h | 13 +- .../Connection/MessageTransport_t.cpp | 123 ++++++++++ 15 files changed, 1036 insertions(+), 22 deletions(-) create mode 100644 src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.cpp create mode 100644 src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h create mode 100644 src/OpcUaStackPubSub/NetworkMessage/GroupHeader.cpp create mode 100644 src/OpcUaStackPubSub/NetworkMessage/GroupHeader.h create mode 100644 src/OpcUaStackPubSub/NetworkMessage/NetworkMessage.cpp create mode 100644 src/OpcUaStackPubSub/NetworkMessage/NetworkMessage.h create mode 100644 src/OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.cpp create mode 100644 src/OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.h create mode 100644 src/OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.cpp create mode 100644 src/OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.h create mode 100644 tst/OpcUaStackPubSub/Connection/MessageTransport_t.cpp diff --git a/src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.cpp b/src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.cpp new file mode 100644 index 000000000..3438b01ff --- /dev/null +++ b/src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.cpp @@ -0,0 +1,50 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de), Aleksey Timin (atimin@gmail.com) + */ + +#include "OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h" + +namespace OpcUaStackPubSub +{ + + DataSetReaderBase::DataSetReaderBase(void) + { + } + + DataSetReaderBase::~DataSetReaderBase(void) + { + } + + void + DataSetReaderBase::dataSetReaderId(uint32_t dataSetReaderId) + { + dataSetReaderId_ = dataSetReaderId; + } + + uint32_t + DataSetReaderBase::dataSetReaderId(void) + { + return dataSetReaderId_; + } + + bool + DataSetReaderBase::encodeDataSet(boost::asio::streambuf& dataSet, uint16_t& size) + { + // FIXME: todo + return true; + } + +} diff --git a/src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h b/src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h new file mode 100644 index 000000000..a3ff0cf34 --- /dev/null +++ b/src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h @@ -0,0 +1,50 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de), Aleksey Timin (atimin@gmail.com) + */ + +#ifndef __OpcUaStackPubSub_DataSetReaderBase_h__ +#define __OpcUaStackPubSub_DataSetReaderBase_h__ + +#include + +#include + +#include "OpcUaStackCore/Base/os.h" + +namespace OpcUaStackPubSub +{ + + class DLLEXPORT DataSetReaderBase + { + public: + typedef boost::shared_ptr SPtr; + typedef std::map Map; + + DataSetReaderBase(void); + virtual ~DataSetReaderBase(void); + + void dataSetReaderId(uint32_t dataSetReaderId); + uint32_t dataSetReaderId(void); + + bool encodeDataSet(boost::asio::streambuf& dataSet, uint16_t& size); + + public: + uint32_t dataSetReaderId_ = 0; // unique data set writer id + }; + +} + +#endif diff --git a/src/OpcUaStackPubSub/MessageTransport/MessageTransport.cpp b/src/OpcUaStackPubSub/MessageTransport/MessageTransport.cpp index 38c16561b..454e991fc 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MessageTransport.cpp +++ b/src/OpcUaStackPubSub/MessageTransport/MessageTransport.cpp @@ -17,6 +17,7 @@ */ #include "OpcUaStackPubSub/MessageTransport/MessageTransport.h" +#include "OpcUaStackPubSub/NetworkMessage/NetworkMessage.h" #include "OpcUaStackCore/Base/Log.h" #include "OpcUaStackPubSub/Events/NetworkSendEvent.h" #include "OpcUaStackPubSub/Events/NetworkRecvEvent.h" @@ -42,6 +43,7 @@ namespace OpcUaStackPubSub ServerServiceBase::ioThread_ = ioThread.get(); strand_ = ioThread->createStrand(); messageBus_ = messageBus; + connectionName_ = connectionName; // register message bus receiver MessageBusMemberConfig messageBusMemberConfig; @@ -50,7 +52,7 @@ namespace OpcUaStackPubSub // activate receiver activateReceiver( - [this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { + [this, &messageBusMemberConfig](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { // receive message from internal message bus auto event = boost::static_pointer_cast(message); @@ -59,13 +61,29 @@ namespace OpcUaStackPubSub case EventType::NetworkRecvEvent: { NetworkRecvEvent::SPtr event = boost::static_pointer_cast(message); - // FIXME: todo + + std::iostream ios(&(event->streamBuf())); + NetworkMessage networkMessage; + networkMessage.opcUaBinaryDecode(ios); + + auto publisherId = networkMessage.networkMessageHeader()->publisherId(); + + auto it = networkMessageProcessorMap_.find(publisherId); + if (it != networkMessageProcessorMap_.end()) { + auto readerGroupBusMember = messageBus_->registerMember(it->second, messageBusMemberConfig); + messageBus_->messageSend(messageBusMember_, readerGroupBusMember, event); + } else { + Log(Error, "network message processor does not exist for this networkmessage") + .parameter("PublisherId", publisherId); + } + break; } case EventType::NetworkSendEvent: { + Log(Info, "Recieved NetworkSendEvent"); NetworkSendEvent::SPtr event = boost::static_pointer_cast(message); - // FIXME: todo + messageBus_->messageSend(messageBusMember_, connectionBusMember_, event); break; } default: @@ -89,14 +107,21 @@ namespace OpcUaStackPubSub bool MessageTransport::startup(void) { - // FIXME: todo + // get reference to connection from message bus + if (!messageBus_->existMember(connectionName_)) { + Log(Error, "udp connection message bus member don't exist") + .parameter("UdpConnectionName", connectionName_); + return false; + } + connectionBusMember_ = messageBus_->getMember(connectionName_); + return true; } bool MessageTransport::shutdown(void) { - // FIXME: todo + // FIXME: todo return true; } @@ -106,7 +131,16 @@ namespace OpcUaStackPubSub const std::string& networkMessageProcessorName // message bus member name ) { - // FIXME: todo + auto it = networkMessageProcessorMap_.find(publisherId); + if (it != networkMessageProcessorMap_.end()) { + Log(Error, "register network message processor error, because network message processor d already exist") + .parameter("MessageTransport", serviceName_) + .parameter("NetworkMessageProcessor PublisherId", publisherId); + return false; + } + + // add network message processor to map + networkMessageProcessorMap_.insert(std::make_pair(publisherId, networkMessageProcessorName)); return true; } @@ -115,7 +149,15 @@ namespace OpcUaStackPubSub uint32_t publisherId ) { - // FIXME: todo + auto it = networkMessageProcessorMap_.find(publisherId); + if (it == networkMessageProcessorMap_.end()) { + Log(Error, "deregister network message processor error, because network message processor does not exist") + .parameter("MessageTransport", serviceName_) + .parameter("NetworkMessageProcessor PublisherId", publisherId); + return false; + } + + networkMessageProcessorMap_.erase(it); return true; } diff --git a/src/OpcUaStackPubSub/MessageTransport/MessageTransport.h b/src/OpcUaStackPubSub/MessageTransport/MessageTransport.h index 864e35211..07e0261b0 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MessageTransport.h +++ b/src/OpcUaStackPubSub/MessageTransport/MessageTransport.h @@ -65,6 +65,9 @@ namespace OpcUaStackPubSub private: OpcUaStackCore::IOThread::SPtr ioThread_; // smart pointer to io thread + std::string connectionName_; + OpcUaStackCore::MessageBusMember::WPtr connectionBusMember_; + std::map networkMessageProcessorMap_; }; } diff --git a/src/OpcUaStackPubSub/NetworkMessage/GroupHeader.cpp b/src/OpcUaStackPubSub/NetworkMessage/GroupHeader.cpp new file mode 100644 index 000000000..e577e71b5 --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessage/GroupHeader.cpp @@ -0,0 +1,84 @@ +#include "OpcUaStackPubSub/NetworkMessage/GroupHeader.h" + +namespace OpcUaStackPubSub +{ + + OpcUaByte& GroupHeader::groupFlags() + { + return groupFlags_; + } + void GroupHeader::groupFlags(OpcUaByte& groupFlags) + { + groupFlags_ = groupFlags; + } + + OpcUaUInt16& GroupHeader::writerGroupId() + { + return writerGroupId_; + } + void GroupHeader::writerGroupId(OpcUaUInt16& writerGroupId) + { + writerGroupId_ = writerGroupId; + } + + OpcUaByte& GroupHeader::groupVersion() + { + return groupVersion_; + } + + void GroupHeader::groupVersion(OpcUaByte& groupVersion) + { + groupVersion_ = groupVersion; + } + + OpcUaUInt16& GroupHeader::networkMessageNumber() + { + return networkMessageNumber_; + } + void GroupHeader::networkMessageNumber(OpcUaUInt16& networkMessageNumber) + { + networkMessageNumber_ = networkMessageNumber; + } + OpcUaUInt16& GroupHeader::sequenceNumber() + { + return sequenceNumber_; + } + void GroupHeader::sequenceNumber(OpcUaUInt16& sequenceNumber) + { + sequenceNumber_ = sequenceNumber; + } + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + // + // GroupHeader Encoding and Decoding + // + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + bool + GroupHeader::opcUaBinaryEncode(std::ostream& os) const + { + bool rc = true; + rc &= OpcUaNumber::opcUaBinaryEncode(os, groupFlags_); + rc &= OpcUaNumber::opcUaBinaryEncode(os, writerGroupId_); + rc &= OpcUaNumber::opcUaBinaryEncode(os, groupVersion_); + rc &= OpcUaNumber::opcUaBinaryEncode(os, networkMessageNumber_); + rc &= OpcUaNumber::opcUaBinaryEncode(os, sequenceNumber_); + + return rc; + } + bool + GroupHeader::opcUaBinaryDecode(std::istream& is) + { + bool rc = true; + rc &= OpcUaNumber::opcUaBinaryDecode(is, groupFlags_); + rc &= OpcUaNumber::opcUaBinaryDecode(is, writerGroupId_); + rc &= OpcUaNumber::opcUaBinaryDecode(is, groupVersion_); + rc &= OpcUaNumber::opcUaBinaryDecode(is, networkMessageNumber_); + rc &= OpcUaNumber::opcUaBinaryDecode(is, sequenceNumber_); + return rc; + } + + + + +} \ No newline at end of file diff --git a/src/OpcUaStackPubSub/NetworkMessage/GroupHeader.h b/src/OpcUaStackPubSub/NetworkMessage/GroupHeader.h new file mode 100644 index 000000000..0b5562095 --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessage/GroupHeader.h @@ -0,0 +1,48 @@ +#ifndef __OpcUaStackPubSub_GroupHeader_h__ +#define __OpcUaStackPubSub_GroupHeader_h__ + +#include "OpcUaStackCore/Base/os.h" +#include "OpcUaStackCore/BuildInTypes/OpcUaGuid.h" +#include "OpcUaStackCore/Base/Utility.h" +#include + +using namespace OpcUaStackCore; + +namespace OpcUaStackPubSub +{ + + class GroupHeader + { + public: + using SPtr = boost::shared_ptr; + GroupHeader() = default; + virtual ~GroupHeader() = default; + bool opcUaBinaryEncode(std::ostream& os) const; + bool opcUaBinaryDecode(std::istream& is); + + OpcUaByte& groupFlags(); + void groupFlags(OpcUaByte& groupFlags); + + OpcUaUInt16& writerGroupId(); + void writerGroupId(OpcUaUInt16& writerGroupId); + + OpcUaByte& groupVersion(); + void groupVersion(OpcUaByte& groupVersion); + + OpcUaUInt16& networkMessageNumber(); + void networkMessageNumber(OpcUaUInt16& networkMessageNumber); + + OpcUaUInt16& sequenceNumber(); + void sequenceNumber(OpcUaUInt16& sequenceNumber); + + private: + OpcUaByte groupFlags_; + OpcUaUInt16 writerGroupId_; + OpcUaByte groupVersion_; + OpcUaUInt16 networkMessageNumber_; + OpcUaUInt16 sequenceNumber_; + }; + +} + +#endif \ No newline at end of file diff --git a/src/OpcUaStackPubSub/NetworkMessage/NetworkMessage.cpp b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessage.cpp new file mode 100644 index 000000000..e93101ec5 --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessage.cpp @@ -0,0 +1,71 @@ +#include "OpcUaStackPubSub/NetworkMessage/NetworkMessage.h" + +namespace OpcUaStackPubSub +{ + + NetworkMessage::NetworkMessage() + : Object() + , networkMessageHeader_(boost::make_shared()) + , groupHeader_(boost::make_shared()) + { + + } + + NetworkMessage::~NetworkMessage() + { + + } + + void NetworkMessage::addPayLoadItem(uint32_t& dataSetMessage) + { + payLoad_.push_back(dataSetMessage); + } + + + NetworkMessageHeader::SPtr NetworkMessage::networkMessageHeader() + { + return networkMessageHeader_; + } + + void NetworkMessage::networkMessageHeader(const NetworkMessageHeader::SPtr networkMessageHeader) + { + networkMessageHeader_ = networkMessageHeader; + } + + GroupHeader::SPtr NetworkMessage::groupHeader() + { + return groupHeader_; + } + + void NetworkMessage::groupHeader(const GroupHeader::SPtr groupHeader) + { + groupHeader_ = groupHeader; + } + + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + // + // NetworkMessage Encoding and Decoding + // + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + bool + NetworkMessage::opcUaBinaryEncode(std::ostream& os) const + { + bool rc = true; + rc &= networkMessageHeader_->opcUaBinaryEncode(os); + rc &= groupHeader_->opcUaBinaryEncode(os); + rc &= payLoad_.opcUaBinaryEncode(os); + return rc; + } + + bool + NetworkMessage::opcUaBinaryDecode(std::istream& is) + { + bool rc = true; + rc &= networkMessageHeader_->opcUaBinaryDecode(is); + rc &= groupHeader_->opcUaBinaryDecode(is);; + rc &= payLoad_.opcUaBinaryDecode(is); + return rc; + } +} \ No newline at end of file diff --git a/src/OpcUaStackPubSub/NetworkMessage/NetworkMessage.h b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessage.h new file mode 100644 index 000000000..09e672348 --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessage.h @@ -0,0 +1,48 @@ +#ifndef __OpcUaStackPubSub_NetworkMessage_h__ +#define __OpcUaStackPubSub_NetworkMessage_h__ + +#include "OpcUaStackCore/Base/os.h" +#include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" +#include "OpcUaStackPubSub/DataSetWriter/DataSetWriterBase.h" +#include "OpcUaStackPubSub/Events/NetworkSendEvent.h" +#include "OpcUaStackPubSub/Events/NetworkRecvEvent.h" +#include "OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.h" +#include "OpcUaStackPubSub/NetworkMessage/GroupHeader.h" +#include "OpcUaStackCore/BuildInTypes/OpcUaGuid.h" +#include "OpcUaStackCore/Base/Utility.h" +#include + +using namespace OpcUaStackCore; +class NetworkMessageHeader; +class GroupHeader; + +namespace OpcUaStackPubSub +{ + + class DLLEXPORT NetworkMessage + : public OpcUaStackCore::Object + , public OpcUaStackServer::ServerServiceBase + { + public: + using SPtr = boost::shared_ptr; + + NetworkMessage(); + virtual ~NetworkMessage(); + + void addPayLoadItem(uint32_t& dataSetMessage); + NetworkMessageHeader::SPtr networkMessageHeader(); + void networkMessageHeader(const NetworkMessageHeader::SPtr networkMessageHeader); + GroupHeader::SPtr groupHeader(); + void groupHeader(const GroupHeader::SPtr groupHeader); + bool opcUaBinaryEncode(std::ostream& os) const; + bool opcUaBinaryDecode(std::istream& is); + + private: + NetworkMessageHeader::SPtr networkMessageHeader_; + GroupHeader::SPtr groupHeader_; + OpcUaInt32Array payLoad_; + //std::vector payLoad_; //actual payload datatype is DataSetMessage + }; +} + +#endif \ No newline at end of file diff --git a/src/OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.cpp b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.cpp new file mode 100644 index 000000000..f7a23ad08 --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.cpp @@ -0,0 +1,93 @@ +#include "OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.h" + +namespace OpcUaStackPubSub +{ + + OpcUaByte& NetworkMessageHeader::uadpVersionAndFlags() + { + //OpcUaByte uadpVersionAndFlags; + //uadpVersionAndFlags &= uadpVersion_; + //uadpVersionAndFlags = (uadpVersionAndFlags<<4) & uadpFlags_; + return uadpVersionAndFlags_; + } + + void NetworkMessageHeader::uadpVersionAndFlags(const OpcUaByte& uadpVersionAndFlags) + { + //uadpVersion_ = uadpVersionAndFlags & 0x0F; + //uadpFlags_ = uadpVersionAndFlags>>4 & 0x0F; + uadpVersionAndFlags_ = uadpVersionAndFlags; + } + + OpcUaByte& NetworkMessageHeader::extendedFlags1() + { + return extendedFlags1_; + } + void NetworkMessageHeader::extendedFlags1(const OpcUaByte& extendedFlags1) + { + extendedFlags1_ = extendedFlags1; + } + + OpcUaByte& NetworkMessageHeader::extendedFlags2() + { + return extendedFlags2_; + } + void NetworkMessageHeader::extendedFlags2(const OpcUaByte& extendedFlags2) + { + extendedFlags2_ = extendedFlags2; + } + + OpcUaUInt32& NetworkMessageHeader::publisherId() + { + return publisherId_; + } + void NetworkMessageHeader::publisherId(const OpcUaUInt32& publisherId) + { + publisherId_ = publisherId; + } + + OpcUaGuid& NetworkMessageHeader::dataSetClassId() + { + return dataSetClassId_; + } + + void NetworkMessageHeader::dataSetClassId(const OpcUaGuid& dataSetClassId) + { + dataSetClassId_ = dataSetClassId; + } + + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + // + // NetworkMessageHeader Encoding and Decoding + // + // ------------------------------------------------------------------------ + // ------------------------------------------------------------------------ + bool + NetworkMessageHeader::opcUaBinaryEncode(std::ostream& os) const + { + bool rc = true; + rc &= OpcUaNumber::opcUaBinaryEncode(os, uadpVersionAndFlags_); + //rc &= OpcUaNumber::opcUaBinaryEncode(os, uadpFlags_); + rc &= OpcUaNumber::opcUaBinaryEncode(os, extendedFlags1_); + rc &= OpcUaNumber::opcUaBinaryEncode(os, extendedFlags2_); + rc &= OpcUaNumber::opcUaBinaryEncode(os, publisherId_); + rc &= dataSetClassId_.opcUaBinaryEncode(os); + return rc; + } + + bool + NetworkMessageHeader::opcUaBinaryDecode(std::istream& is) + { + bool rc = true; + //OpcUaByte versionFlags; + rc &= OpcUaNumber::opcUaBinaryDecode(is, uadpVersionAndFlags_); + rc &= OpcUaNumber::opcUaBinaryDecode(is, extendedFlags1_); + rc &= OpcUaNumber::opcUaBinaryDecode(is, extendedFlags2_); + rc &= OpcUaNumber::opcUaBinaryDecode(is, publisherId_); + //uadpVersion_ = versionFlags & 0x0F; + //uadpFlags_ = versionFlags>>4 & 0x0F; + return rc; + } + + +} \ No newline at end of file diff --git a/src/OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.h b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.h new file mode 100644 index 000000000..dfd695b16 --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.h @@ -0,0 +1,45 @@ +#ifndef __OpcUaStackPubSub_NetworkMessageHeader_h__ +#define __OpcUaStackPubSub_NetworkMessageHeader_h__ + +#include "OpcUaStackCore/BuildInTypes/OpcUaGuid.h" + +using namespace OpcUaStackCore; + +namespace OpcUaStackPubSub +{ + class NetworkMessageHeader + { + public: + using SPtr = boost::shared_ptr; + NetworkMessageHeader() = default; + virtual ~NetworkMessageHeader() = default; + bool opcUaBinaryEncode(std::ostream& os) const; + bool opcUaBinaryDecode(std::istream& is); + + OpcUaByte& uadpVersionAndFlags(); + void uadpVersionAndFlags(const OpcUaByte& uadpVersionAndFlags); + + OpcUaByte& extendedFlags1(); + void extendedFlags1(const OpcUaByte& extendedFlags1); + + OpcUaByte& extendedFlags2(); + void extendedFlags2(const OpcUaByte& extendedFlags2); + + OpcUaUInt32& publisherId(); + void publisherId(const OpcUaUInt32& publisherId); + + OpcUaGuid& dataSetClassId(); + void dataSetClassId(const OpcUaGuid& dataSetClassId); + + + private: + OpcUaByte uadpVersionAndFlags_ = 1; + //OpcUaByte uadpFlags_ :4; + OpcUaByte extendedFlags1_ = 0; + OpcUaByte extendedFlags2_ = 0; + OpcUaUInt32 publisherId_ = 0; + OpcUaGuid dataSetClassId_; + }; +} + +#endif \ No newline at end of file diff --git a/src/OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.cpp b/src/OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.cpp new file mode 100644 index 000000000..a4446b93b --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.cpp @@ -0,0 +1,221 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de) + */ + +#include "OpcUaStackCore/Base/Log.h" +#include "OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.h" +#include "OpcUaStackPubSub/NetworkMessage/NetworkMessage.h" + +using namespace OpcUaStackCore; + +namespace OpcUaStackPubSub +{ + + NetworkMessageProcessor::NetworkMessageProcessor( + const std::string& messageProcessorName, + const std::string& messageTransportName, + OpcUaStackCore::IOThread::SPtr& ioThread, + OpcUaStackCore::MessageBus::SPtr& messageBus + ) + : ServerServiceBase() + { + Log(Info, "Constructor") + .parameter("ServiceName", serviceName_); + + messageTransportName_ = messageTransportName; + + // set parameter in server service base + serviceName_ = messageProcessorName; + ServerServiceBase::ioThread_ = ioThread.get(); + strand_ = ioThread->createStrand(); + messageBus_ = messageBus; + + // register message bus receiver + MessageBusMemberConfig messageBusMemberConfig; + messageBusMemberConfig.strand(strand_); + messageBusMember_ = messageBus_->registerMember(serviceName_, messageBusMemberConfig); + + // activate receiver + activateReceiver( + [this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { + // receive message from internal message bus + auto event = boost::static_pointer_cast(message); + switch (event->eventType()) + { + case EventType::NetworkRecvEvent: + { + break; + } + case EventType::NetworkSendEvent: + { + break; + } + }; + } + ); + } + + NetworkMessageProcessor::~NetworkMessageProcessor(void) + { + // deactivate receiver + deactivateReceiver(); + messageBus_->deregisterMember(messageBusMember_); + } + + void + NetworkMessageProcessor::readerGroupId(uint16_t readerGroupId) + { + readerGroupId_ = readerGroupId; + } + + bool + NetworkMessageProcessor::registerDataSetReader(uint16_t dataSetReaderId, DataSetReaderBase::SPtr& dataSetReader) + { + // check if data set reader already exist + auto it = dataSetReaderMap_.find(dataSetReaderId); + if (it != dataSetReaderMap_.end()) { + Log(Error, "register data set writer error, because data set writer id already exist") + .parameter("ReaderGroupName", serviceName_) + .parameter("DataSetReaderId", dataSetReaderId); + return false; + } + // add data set writer to map + dataSetReaderMap_.insert(std::make_pair(dataSetReaderId, dataSetReader)); + return true; + } + + bool + NetworkMessageProcessor::registerDataSetReaderSync(uint16_t dataSetReaderId, DataSetReaderBase::SPtr& dataSetReader) + { + // check if the call is made within the strand + if (!strand_->running_in_this_thread()) { + std::promise promise; + std::future future = promise.get_future(); + strand_->dispatch( + [this, &promise, dataSetReaderId, dataSetReader](void) mutable { + bool rc = registerDataSetReader(dataSetReaderId, dataSetReader); + promise.set_value(rc); + } + ); + future.wait(); + return future.get(); + } + + return registerDataSetReader(dataSetReaderId, dataSetReader); + } + + bool + NetworkMessageProcessor::deregisterDataSetReader(uint16_t dataSetReaderId) + { + // check if data set writer exist + auto it = dataSetReaderMap_.find(dataSetReaderId); + if (it == dataSetReaderMap_.end()) { + Log(Error, "deregister data set writer error, because data set writer id not exist") + .parameter("ReaderGroupName", serviceName_) + .parameter("DataSetReaderId", dataSetReaderId); + return false; + } + + dataSetReaderMap_.erase(it); + return true; + } + + bool + NetworkMessageProcessor::startupSync(void) + { + // check if the call is made within the strand + if (!strand_->running_in_this_thread()) { + std::promise promise; + std::future future = promise.get_future(); + strand_->dispatch( + [this, &promise](void) mutable { + bool rc = startup(); + promise.set_value(rc); + } + ); + future.wait(); + return future.get(); + } + + return startup(); + } + + bool + NetworkMessageProcessor::shutdownSync(void) + { + // check if the call is made within the strand + if (!strand_->running_in_this_thread()) { + std::promise promise; + std::future future = promise.get_future(); + strand_->dispatch( + [this, &promise](void) mutable { + bool rc = shutdown(); + promise.set_value(rc); + } + ); + future.wait(); + return future.get(); + } + + return shutdown(); + } + bool + NetworkMessageProcessor::deregisterDataSetReaderSync(uint16_t dataSetReaderId) + { + // check if the call is made within the strand + if (!strand_->running_in_this_thread()) { + std::promise promise; + std::future future = promise.get_future(); + strand_->dispatch( + [this, &promise, dataSetReaderId](void) mutable { + bool rc = deregisterDataSetReader(dataSetReaderId); + promise.set_value(rc); + } + ); + future.wait(); + return future.get(); + } + + return deregisterDataSetReaderSync(dataSetReaderId); + } + + bool + NetworkMessageProcessor::startup(void) + { + // check if the call is made within the strand + // FIXME: todo + Log(Info, "Startup").parameter("ServiceName", serviceName_); + + // get reference to message transport bus member + if (!messageBus_->existMember(messageTransportName_)) { + Log(Error, "message transport message bus member don't exist") + .parameter("MessageTransportName", messageTransportName_); + return false; + } + messageTransportBusMember_ = messageBus_->getMember(messageTransportName_); + + return true; + } + + bool + NetworkMessageProcessor::shutdown(void) + { + // check if the call is made within the strand + // FIXME: todo + + return true; + } +} diff --git a/src/OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.h b/src/OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.h new file mode 100644 index 000000000..28b3a4ed5 --- /dev/null +++ b/src/OpcUaStackPubSub/NetworkMessageProcessor/NetworkMessageProcessor.h @@ -0,0 +1,70 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de) + */ + +#ifndef __OpcUaStackPubSub_NetworkMessageProcessor_h__ +#define __OpcUaStackPubSub_NetworkMessageProcessor_h__ + +#include + +#include "OpcUaStackCore/Base/os.h" +#include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" +#include "OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h" +#include "OpcUaStackPubSub/Events/NetworkSendEvent.h" +#include "OpcUaStackPubSub/Events/NetworkRecvEvent.h" + +namespace OpcUaStackPubSub +{ + + class DLLEXPORT NetworkMessageProcessor + : public OpcUaStackCore::Object + , public OpcUaStackServer::ServerServiceBase + { + public: + using SPtr = boost::shared_ptr; + + NetworkMessageProcessor( + const std::string& messageProcessorName, + const std::string& messageTransportName, + OpcUaStackCore::IOThread::SPtr& ioThread, + OpcUaStackCore::MessageBus::SPtr& messageBus + ); + virtual ~NetworkMessageProcessor(void); + + void readerGroupId(uint16_t readerGroupId); + bool registerDataSetReaderSync(uint16_t dataSetReaderId, DataSetReaderBase::SPtr& dataSetReader); + bool deregisterDataSetReaderSync(uint16_t dataSetReaderId); + + bool startupSync(void); + bool shutdownSync(void); + + private: + uint16_t readerGroupId_ = 0; + std::string messageTransportName_ = ""; + OpcUaStackCore::MessageBusMember::WPtr messageTransportBusMember_; + + DataSetReaderBase::Map dataSetReaderMap_; + + bool startup(); + bool shutdown(); + + bool registerDataSetReader(uint16_t dataSetReaderId, DataSetReaderBase::SPtr& dataSetReader); + bool deregisterDataSetReader(uint16_t dataSetReaderId); + }; + +} + +#endif diff --git a/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.cpp b/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.cpp index a776230a8..a96379166 100644 --- a/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.cpp +++ b/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.cpp @@ -17,6 +17,7 @@ #include "OpcUaStackCore/Base/Log.h" #include "OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.h" +#include "OpcUaStackPubSub/NetworkMessage/NetworkMessage.h" using namespace OpcUaStackCore; @@ -31,6 +32,9 @@ namespace OpcUaStackPubSub ) : ServerServiceBase() { + Log(Info, "Constructor") + .parameter("ServiceName", serviceName_); + messageTransportName_ = messageTransportName; // set parameter in server service base @@ -78,7 +82,6 @@ namespace OpcUaStackPubSub .parameter("DataSetWriterId", dataSetWriterId); return false; } - // add data set writer to map dataSetWriterMap_.insert(std::make_pair(dataSetWriterId, dataSetWriter)); return true; @@ -139,12 +142,11 @@ namespace OpcUaStackPubSub return deregisterDataSetWriterSync(dataSetWriterId); } - + bool - NetworkMessageWriterGroup::startupSync(void) + NetworkMessageWriterGroup::startup(void) { - // check if the call is made within the strand - // FIXME: todo + Log(Info, "Startup").parameter("ServiceName", serviceName_); // get reference to message transport bus member if (!messageBus_->existMember(messageTransportName_)) { @@ -152,26 +154,63 @@ namespace OpcUaStackPubSub .parameter("MessageTransportName", messageTransportName_); return false; } + messageTransportBusMember_ = messageBus_->getMember(messageTransportName_); // start publish loop - // FIXME: todo + while(!leavePublishLoop_) { + publishLoop(); + boost::this_thread::sleep(boost::posix_time::seconds(1)); + } + return true; + } - // FIXME: todo + bool + NetworkMessageWriterGroup::shutdown(void) + { + // stop publish loop + leavePublishLoop_ = true; return true; } bool - NetworkMessageWriterGroup::shutdownSync(void) + NetworkMessageWriterGroup::startupSync(void) { // check if the call is made within the strand - // FIXME: todo + if (!strand_->running_in_this_thread()) { + std::promise promise; + std::future future = promise.get_future(); + strand_->dispatch( + [this, &promise](void) mutable { + bool rc = startup(); + promise.set_value(rc); + } + ); + future.wait(); + return future.get(); + } - // stop publish loop - // FIXME: todo + return startup(); + } - // FIXME: todo - return true; + bool + NetworkMessageWriterGroup::shutdownSync(void) + { + // check if the call is made within the strand + if (!strand_->running_in_this_thread()) { + std::promise promise; + std::future future = promise.get_future(); + strand_->dispatch( + [this, &promise](void) mutable { + bool rc = shutdown(); + promise.set_value(rc); + } + ); + future.wait(); + return future.get(); + } + + return shutdown(); } void @@ -198,8 +237,26 @@ namespace OpcUaStackPubSub // create network message // FIXME: todo + auto networkMessage = boost::make_shared(); + auto networkMessageHeader = boost::make_shared(); + + OpcUaByte uadpVersionAndFlags; + uadpVersionAndFlags = 0b0001; //Flags value --> only publisherId(bit 4) is enabled + uadpVersionAndFlags = uadpVersionAndFlags<<4 & 1; //version: default 1; + networkMessageHeader->uadpVersionAndFlags(uadpVersionAndFlags); + + OpcUaInt32 publisherId = 25; //temporary data + networkMessageHeader->publisherId(publisherId); + + networkMessage->networkMessageHeader(networkMessageHeader); + uint32_t dataSetMessage = 2525; //temporary data + networkMessage->addPayLoadItem(dataSetMessage); // send network message to message transport module + auto event = boost::make_shared(); + std::ostream os(&event->streamBuf()); + networkMessage->opcUaBinaryEncode(os); + messageBus_->messageSend(messageBusMember_, messageTransportBusMember_, event); } } diff --git a/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.h b/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.h index 36e5a40d6..90b0f271d 100644 --- a/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.h +++ b/src/OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.h @@ -23,6 +23,8 @@ #include "OpcUaStackCore/Base/os.h" #include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" #include "OpcUaStackPubSub/DataSetWriter/DataSetWriterBase.h" +#include "OpcUaStackPubSub/Events/NetworkSendEvent.h" +#include "OpcUaStackPubSub/Events/NetworkRecvEvent.h" namespace OpcUaStackPubSub { @@ -32,7 +34,7 @@ namespace OpcUaStackPubSub , public OpcUaStackServer::ServerServiceBase { public: - typedef boost::shared_ptr SPtr; + using SPtr = boost::shared_ptr; NetworkMessageWriterGroup( const std::string& writerGroupName, @@ -49,18 +51,25 @@ namespace OpcUaStackPubSub bool startupSync(void); bool shutdownSync(void); - public: + private: uint16_t writerGroupId_ = 0; std::string messageTransportName_ = ""; OpcUaStackCore::MessageBusMember::WPtr messageTransportBusMember_; + bool leavePublishLoop_ = false; DataSetWriterBase::Map dataSetWriterMap_; + bool startup(); + bool shutdown(); bool registerDataSetWriter(uint16_t dataSetWriterId, DataSetWriterBase::SPtr& dataSetWriter); bool deregisterDataSetWriter(uint16_t dataSetWriterId); void publishLoop(void); + }; + + + } #endif diff --git a/tst/OpcUaStackPubSub/Connection/MessageTransport_t.cpp b/tst/OpcUaStackPubSub/Connection/MessageTransport_t.cpp new file mode 100644 index 000000000..d1cdcb36b --- /dev/null +++ b/tst/OpcUaStackPubSub/Connection/MessageTransport_t.cpp @@ -0,0 +1,123 @@ +#include "unittest.h" + +#include + +#include "OpcUaStackPubSub/Connection/UDPConnection.h" +#include "OpcUaStackPubSub/MessageTransport/MessageTransport.h" +#include "OpcUaStackPubSub/NetworkMessageWriterGroup/NetworkMessageWriterGroup.h" +#include "OpcUaStackPubSub/NetworkMessage/NetworkMessage.h" +//#include "OpcUaStackPubSub/NetworkMessage/NetworkMessageHeader.h" + +using namespace OpcUaStackCore; +using namespace OpcUaStackPubSub; + +class DLLEXPORT TestUDPConnection +{ + public: + + TestUDPConnection(std::promise* promise) { + promise_ = promise; + } + + ~TestUDPConnection(void) { + } + + void + receive(MessageBusError error, const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { + event_ = boost::static_pointer_cast(message); + promise_->set_value(); + } + + std::promise* promise_ = nullptr; + NetworkRecvEvent::SPtr event_; +}; + +BOOST_AUTO_TEST_SUITE(MessageTransport_) + +BOOST_AUTO_TEST_CASE(MessageTransport_) +{ + std::cout << "MessageTransport_t" << std::endl; +} + + +BOOST_AUTO_TEST_CASE(MessageTransport_send_receive_message) +{ + // start thread pool + auto ioThread = boost::make_shared("TestThread1"); + ioThread->numberThreads(6); + BOOST_REQUIRE(ioThread->startup() == true); + + // start message bus + MessageBusConfig messageBusConfig; + messageBusConfig.ioThread(ioThread); + MessageBus::SPtr messageBus = boost::make_shared(messageBusConfig); + + // register sender and receiver + auto udpConnectionMember = messageBus->registerMember("UDPConnection"); + auto messageTransportMember = messageBus->registerMember("MessageTransport"); + auto receiver = messageBus->registerMember("receiver"); + + // create udp connection + auto udpConnection = boost::make_shared( + "127.0.0.1", // receiver address + 5678, // receiver port + "127.0.0.1", // target address + 5679, // target port + "MessageTransport", // message bus name of the message transport module + "UDPConnection", // message bus name of the udp connection module + ioThread, // thread + messageBus // message bus + ); + udpConnection->startup(); + + auto messageTransport = boost::make_shared( + "receiver", + "MessageTransport", + ioThread, + messageBus + ); + messageTransport->startup(); + + // receive + std::promise promise; + auto future = promise.get_future(); + TestUDPConnection testCon(&promise); + + messageBus->messageReceive( + receiver, + [&testCon](MessageBusError error, const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) mutable { + testCon.receive(error, handleFrom, message); + } + ); + + // send network message to message transport module + auto event = boost::make_shared(); + std::ostream os(&event->streamBuf()); + os << "Dies ist eine Message"; + + messageBus->messageSend(udpConnectionMember, messageTransportMember, event); + + // wait for receiver to finish + BOOST_REQUIRE(future.wait_for(std::chrono::seconds(1)) == std::future_status::ready); + + // check received message + boost::asio::streambuf::const_buffers_type bufs = testCon.event_->streamBuf().data(); + std::string str( + boost::asio::buffers_begin(bufs), + boost::asio::buffers_begin(bufs) + testCon.event_->streamBuf().size() + ); + + BOOST_REQUIRE(str == std::string("Dies ist eine Message")); + + messageTransport->shutdown(); + messageTransport.reset(); + + // delete udp connection + udpConnection->shutdown(); + udpConnection.reset(); + + // stop thread pool + BOOST_REQUIRE(ioThread->shutdown() == true); +} + +BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file From 4266d98bc6858d8b276c0971e5eab324a88c10b1 Mon Sep 17 00:00:00 2001 From: Kai Huebl Date: Fri, 18 Mar 2022 15:07:58 +0100 Subject: [PATCH 07/11] add class MsgTransProvider --- .../MessageTransport/MsgTransProvider.cpp | 77 +++++++++++++++++++ .../MessageTransport/MsgTransProvider.h | 50 ++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.cpp create mode 100644 src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.h diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.cpp new file mode 100644 index 000000000..f70fbc94e --- /dev/null +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.cpp @@ -0,0 +1,77 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de) + */ +#include "OpcUaStackCore/Base/Log.h" +#include "OpcUaStackPubSub/MessageTransport/MsgTransProvider.h" + +using namespace OpcUaStackCore; + +namespace OpcUaStackPubSub +{ + + MsgTransProvider::MsgTransProvider( + const std::string& serviceName, + OpcUaStackCore::IOThread::SPtr& ioThread, + OpcUaStackCore::MessageBus::SPtr& messageBus + ) + : ServerServiceBase() + { + // set parameter + ioThread_ = ioThread; + + // set parameter in server service base + serviceName_ = serviceName; + ServerServiceBase::ioThread_ = ioThread.get(); + strand_ = ioThread->createStrand(); + messageBus_ = messageBus; + + // register message bus receiver + MessageBusMemberConfig messageBusMemberConfig; + messageBusMemberConfig.strand(strand_); + messageBusMember_ = messageBus_->registerMember(serviceName_, messageBusMemberConfig); + + // activate receiver + activateReceiver( + [this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { + // receive message from internal message bus + + // FIXME: todo + } + ); + } + + MsgTransProvider::~MsgTransProvider(void) + { + // deactivate receiver + deactivateReceiver(); + messageBus_->deregisterMember(messageBusMember_); + } + + bool + MsgTransProvider::startup(void) + { + // FIXME: todo + return true; + } + + bool + MsgTransProvider::shutdown(void) + { + // FIXME: todo + return true; + } + +} diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.h b/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.h new file mode 100644 index 000000000..731c2523d --- /dev/null +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.h @@ -0,0 +1,50 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de) + + */ + +#ifndef __OpcUaStackPubSub_MsgTransProvider_h__ +#define __OpcUaStackPubSub_MsgTransProvider_h__ + +#include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" + +namespace OpcUaStackPubSub +{ + + class DLLEXPORT MsgTransProvider + : public OpcUaStackCore::Object + , public OpcUaStackServer::ServerServiceBase + { + public: + using SPtr = boost::shared_ptr; + + MsgTransProvider( + const std::string& serviceName, + OpcUaStackCore::IOThread::SPtr& ioThread, + OpcUaStackCore::MessageBus::SPtr& messageBus + ); + ~MsgTransProvider(void); + + bool startup(void); + bool shutdown(void); + + private: + OpcUaStackCore::IOThread::SPtr ioThread_; // smart pointer to io thread + }; + +} + +#endif From 42b163524957231a50e731b82cdf1dd33e91bbaa Mon Sep 17 00:00:00 2001 From: Kai Huebl Date: Fri, 18 Mar 2022 15:10:42 +0100 Subject: [PATCH 08/11] add class MsgTransSubscriber --- .../MessageTransport/MsgTransSubscriber.cpp | 77 +++++++++++++++++++ .../MessageTransport/MsgTransSubscriber.h | 50 ++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp create mode 100644 src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp new file mode 100644 index 000000000..293d84843 --- /dev/null +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp @@ -0,0 +1,77 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de) + */ +#include "OpcUaStackCore/Base/Log.h" +#include "OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h" + +using namespace OpcUaStackCore; + +namespace OpcUaStackPubSub +{ + + MsgTransSubscriber::MsgTransSubscriber( + const std::string& serviceName, + OpcUaStackCore::IOThread::SPtr& ioThread, + OpcUaStackCore::MessageBus::SPtr& messageBus + ) + : ServerServiceBase() + { + // set parameter + ioThread_ = ioThread; + + // set parameter in server service base + serviceName_ = serviceName; + ServerServiceBase::ioThread_ = ioThread.get(); + strand_ = ioThread->createStrand(); + messageBus_ = messageBus; + + // register message bus receiver + MessageBusMemberConfig messageBusMemberConfig; + messageBusMemberConfig.strand(strand_); + messageBusMember_ = messageBus_->registerMember(serviceName_, messageBusMemberConfig); + + // activate receiver + activateReceiver( + [this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { + // receive message from internal message bus + + // FIXME: todo + } + ); + } + + MsgTransSubscriber::~MsgTransSubscriber(void) + { + // deactivate receiver + deactivateReceiver(); + messageBus_->deregisterMember(messageBusMember_); + } + + bool + MsgTransSubscriber::startup(void) + { + // FIXME: todo + return true; + } + + bool + MsgTransSubscriber::shutdown(void) + { + // FIXME: todo + return true; + } + +} diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h new file mode 100644 index 000000000..cca918cee --- /dev/null +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h @@ -0,0 +1,50 @@ +/* + Copyright 2022 Kai Huebl (kai@huebl-sgh.de) + + Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser + Datei nur in Übereinstimmung mit der Lizenz erlaubt. + Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. + + Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, + erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE + GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. + + Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen + im Rahmen der Lizenz finden Sie in der Lizenz. + + Autor: Kai Huebl (kai@huebl-sgh.de) + + */ + +#ifndef __OpcUaStackPubSub_MsgTransSubscriber_h__ +#define __OpcUaStackPubSub_MsgTransSubscriber_h__ + +#include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" + +namespace OpcUaStackPubSub +{ + + class DLLEXPORT MsgTransSubscriber + : public OpcUaStackCore::Object + , public OpcUaStackServer::ServerServiceBase + { + public: + using SPtr = boost::shared_ptr; + + MsgTransSubscriber( + const std::string& serviceName, + OpcUaStackCore::IOThread::SPtr& ioThread, + OpcUaStackCore::MessageBus::SPtr& messageBus + ); + ~MsgTransSubscriber(void); + + bool startup(void); + bool shutdown(void); + + private: + OpcUaStackCore::IOThread::SPtr ioThread_; // smart pointer to io thread + }; + +} + +#endif From 73700ef4574d4f61fde9a4106ccfba207ade9988 Mon Sep 17 00:00:00 2001 From: Kai Huebl Date: Fri, 18 Mar 2022 15:36:25 +0100 Subject: [PATCH 09/11] add some interface functions --- ...ransProvider.cpp => MsgTransPublisher.cpp} | 13 +++++++------ ...MsgTransProvider.h => MsgTransPublisher.h} | 19 +++++++++++++------ .../MessageTransport/MsgTransSubscriber.cpp | 19 +++++++++++++++++++ .../MessageTransport/MsgTransSubscriber.h | 16 ++++++++++++++++ 4 files changed, 55 insertions(+), 12 deletions(-) rename src/OpcUaStackPubSub/MessageTransport/{MsgTransProvider.cpp => MsgTransPublisher.cpp} (87%) rename src/OpcUaStackPubSub/MessageTransport/{MsgTransProvider.h => MsgTransPublisher.h} (67%) diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp similarity index 87% rename from src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.cpp rename to src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp index f70fbc94e..8a3d43f90 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.cpp +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp @@ -14,15 +14,16 @@ Autor: Kai Huebl (kai@huebl-sgh.de) */ -#include "OpcUaStackCore/Base/Log.h" -#include "OpcUaStackPubSub/MessageTransport/MsgTransProvider.h" + +#include "OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h" using namespace OpcUaStackCore; namespace OpcUaStackPubSub { - MsgTransProvider::MsgTransProvider( + MsgTransPublisher::MsgTransPublisher( + const std::string& connectionName, const std::string& serviceName, OpcUaStackCore::IOThread::SPtr& ioThread, OpcUaStackCore::MessageBus::SPtr& messageBus @@ -53,7 +54,7 @@ namespace OpcUaStackPubSub ); } - MsgTransProvider::~MsgTransProvider(void) + MsgTransPublisher::~MsgTransPublisher(void) { // deactivate receiver deactivateReceiver(); @@ -61,14 +62,14 @@ namespace OpcUaStackPubSub } bool - MsgTransProvider::startup(void) + MsgTransPublisher::startup(void) { // FIXME: todo return true; } bool - MsgTransProvider::shutdown(void) + MsgTransPublisher::shutdown(void) { // FIXME: todo return true; diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.h b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h similarity index 67% rename from src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.h rename to src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h index 731c2523d..087cd16c7 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransProvider.h +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h @@ -14,29 +14,36 @@ Autor: Kai Huebl (kai@huebl-sgh.de) + DESCRIPTION: + + The message transport publisher module receives a message from one ore more + network message writer groups via the internal message bus. The received message + is forwarded to the assigned communication module via the internal message bus. + */ -#ifndef __OpcUaStackPubSub_MsgTransProvider_h__ -#define __OpcUaStackPubSub_MsgTransProvider_h__ +#ifndef __OpcUaStackPubSub_MsgTransPublisher_h__ +#define __OpcUaStackPubSub_MsgTransPublisher_h__ #include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" namespace OpcUaStackPubSub { - class DLLEXPORT MsgTransProvider + class DLLEXPORT MsgTransPublisher : public OpcUaStackCore::Object , public OpcUaStackServer::ServerServiceBase { public: - using SPtr = boost::shared_ptr; + using SPtr = boost::shared_ptr; - MsgTransProvider( + MsgTransPublisher( + const std::string& connectionName, // message bus member name const std::string& serviceName, OpcUaStackCore::IOThread::SPtr& ioThread, OpcUaStackCore::MessageBus::SPtr& messageBus ); - ~MsgTransProvider(void); + ~MsgTransPublisher(void); bool startup(void); bool shutdown(void); diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp index 293d84843..767bf61e3 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp @@ -74,4 +74,23 @@ namespace OpcUaStackPubSub return true; } + bool + MsgTransSubscriber::registerNetworkMessageProcessor( + uint32_t publisherId, // publisher id + const std::string& networkMessageProcessorName // message bus member name + ) + { + // FIXME: todo + return true; + } + + bool + MsgTransSubscriber::deregisterNetworkMessageProcessor( + uint32_t publisherId + ) + { + // FIXME: todo + return true; + } + } diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h index cca918cee..43de15af5 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h @@ -14,6 +14,14 @@ Autor: Kai Huebl (kai@huebl-sgh.de) + DESCRIPTION: + + The message transport subscriber module receives a message from the communication + module via the internal message bus.The publisher ID is determined from the received + message and the assigned network message processor is determined. The received message + is forwarded to the assigned network message processor module via the internal message + bus. + */ #ifndef __OpcUaStackPubSub_MsgTransSubscriber_h__ @@ -41,6 +49,14 @@ namespace OpcUaStackPubSub bool startup(void); bool shutdown(void); + bool registerNetworkMessageProcessor( + uint32_t publisherId, // publisher id + const std::string& networkMessageProcessorName // message bus member name + ); + bool deregisterNetworkMessageProcessor( + uint32_t publisherId + ); + private: OpcUaStackCore::IOThread::SPtr ioThread_; // smart pointer to io thread }; From db57b08e659d8d9a9136cd10fc0fd773193ff22c Mon Sep 17 00:00:00 2001 From: Kai Huebl Date: Fri, 18 Mar 2022 15:47:02 +0100 Subject: [PATCH 10/11] add receiver function --- .../MessageTransport/MsgTransPublisher.cpp | 19 ++++++++++++++++++- .../MessageTransport/MsgTransSubscriber.cpp | 18 +++++++++++++++++- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp index 8a3d43f90..89a89950d 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp @@ -15,6 +15,8 @@ Autor: Kai Huebl (kai@huebl-sgh.de) */ +#include "OpcUaStackCore/Base/Log.h" +#include "OpcUaStackPubSub/Events/NetworkSendEvent.h" #include "OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h" using namespace OpcUaStackCore; @@ -49,7 +51,22 @@ namespace OpcUaStackPubSub [this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { // receive message from internal message bus - // FIXME: todo + auto event = boost::static_pointer_cast(message); + switch (event->eventType()) + { + case EventType::NetworkSendEvent: + { + NetworkSendEvent::SPtr event = boost::static_pointer_cast(message); + // FIXME: todo + break; + } + default: + { + Log(Error, "invalid message received in message transport module") + .parameter("ServiceName", serviceName_) + .parameter("Event", (uint32_t)event->eventType()); + } + } } ); } diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp index 767bf61e3..1224dd191 100644 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp +++ b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp @@ -15,6 +15,7 @@ Autor: Kai Huebl (kai@huebl-sgh.de) */ #include "OpcUaStackCore/Base/Log.h" +#include "OpcUaStackPubSub/Events/NetworkRecvEvent.h" #include "OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h" using namespace OpcUaStackCore; @@ -48,7 +49,22 @@ namespace OpcUaStackPubSub [this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { // receive message from internal message bus - // FIXME: todo + auto event = boost::static_pointer_cast(message); + switch (event->eventType()) + { + case EventType::NetworkRecvEvent: + { + NetworkRecvEvent::SPtr event = boost::static_pointer_cast(message); + // FIXME: todo + break; + } + default: + { + Log(Error, "invalid message received in message transport module") + .parameter("ServiceName", serviceName_) + .parameter("Event", (uint32_t)event->eventType()); + } + } } ); } From 28d84f4370a81cf5b398460f394f306d66bc9720 Mon Sep 17 00:00:00 2001 From: Kai Huebl Date: Fri, 18 Mar 2022 16:00:36 +0100 Subject: [PATCH 11/11] rename class --- .../MessageTransport/MsgTransPublisher.cpp | 95 --------------- .../MessageTransport/MsgTransPublisher.h | 57 --------- .../MessageTransport/MsgTransSubscriber.cpp | 112 ------------------ .../MessageTransport/MsgTransSubscriber.h | 66 ----------- 4 files changed, 330 deletions(-) delete mode 100644 src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp delete mode 100644 src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h delete mode 100644 src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp delete mode 100644 src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp deleted file mode 100644 index 89a89950d..000000000 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.cpp +++ /dev/null @@ -1,95 +0,0 @@ -/* - Copyright 2022 Kai Huebl (kai@huebl-sgh.de) - - Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser - Datei nur in Übereinstimmung mit der Lizenz erlaubt. - Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. - - Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, - erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE - GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. - - Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen - im Rahmen der Lizenz finden Sie in der Lizenz. - - Autor: Kai Huebl (kai@huebl-sgh.de) - */ - -#include "OpcUaStackCore/Base/Log.h" -#include "OpcUaStackPubSub/Events/NetworkSendEvent.h" -#include "OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h" - -using namespace OpcUaStackCore; - -namespace OpcUaStackPubSub -{ - - MsgTransPublisher::MsgTransPublisher( - const std::string& connectionName, - const std::string& serviceName, - OpcUaStackCore::IOThread::SPtr& ioThread, - OpcUaStackCore::MessageBus::SPtr& messageBus - ) - : ServerServiceBase() - { - // set parameter - ioThread_ = ioThread; - - // set parameter in server service base - serviceName_ = serviceName; - ServerServiceBase::ioThread_ = ioThread.get(); - strand_ = ioThread->createStrand(); - messageBus_ = messageBus; - - // register message bus receiver - MessageBusMemberConfig messageBusMemberConfig; - messageBusMemberConfig.strand(strand_); - messageBusMember_ = messageBus_->registerMember(serviceName_, messageBusMemberConfig); - - // activate receiver - activateReceiver( - [this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { - // receive message from internal message bus - - auto event = boost::static_pointer_cast(message); - switch (event->eventType()) - { - case EventType::NetworkSendEvent: - { - NetworkSendEvent::SPtr event = boost::static_pointer_cast(message); - // FIXME: todo - break; - } - default: - { - Log(Error, "invalid message received in message transport module") - .parameter("ServiceName", serviceName_) - .parameter("Event", (uint32_t)event->eventType()); - } - } - } - ); - } - - MsgTransPublisher::~MsgTransPublisher(void) - { - // deactivate receiver - deactivateReceiver(); - messageBus_->deregisterMember(messageBusMember_); - } - - bool - MsgTransPublisher::startup(void) - { - // FIXME: todo - return true; - } - - bool - MsgTransPublisher::shutdown(void) - { - // FIXME: todo - return true; - } - -} diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h b/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h deleted file mode 100644 index 087cd16c7..000000000 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransPublisher.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - Copyright 2022 Kai Huebl (kai@huebl-sgh.de) - - Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser - Datei nur in Übereinstimmung mit der Lizenz erlaubt. - Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. - - Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, - erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE - GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. - - Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen - im Rahmen der Lizenz finden Sie in der Lizenz. - - Autor: Kai Huebl (kai@huebl-sgh.de) - - DESCRIPTION: - - The message transport publisher module receives a message from one ore more - network message writer groups via the internal message bus. The received message - is forwarded to the assigned communication module via the internal message bus. - - */ - -#ifndef __OpcUaStackPubSub_MsgTransPublisher_h__ -#define __OpcUaStackPubSub_MsgTransPublisher_h__ - -#include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" - -namespace OpcUaStackPubSub -{ - - class DLLEXPORT MsgTransPublisher - : public OpcUaStackCore::Object - , public OpcUaStackServer::ServerServiceBase - { - public: - using SPtr = boost::shared_ptr; - - MsgTransPublisher( - const std::string& connectionName, // message bus member name - const std::string& serviceName, - OpcUaStackCore::IOThread::SPtr& ioThread, - OpcUaStackCore::MessageBus::SPtr& messageBus - ); - ~MsgTransPublisher(void); - - bool startup(void); - bool shutdown(void); - - private: - OpcUaStackCore::IOThread::SPtr ioThread_; // smart pointer to io thread - }; - -} - -#endif diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp deleted file mode 100644 index 1224dd191..000000000 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.cpp +++ /dev/null @@ -1,112 +0,0 @@ -/* - Copyright 2022 Kai Huebl (kai@huebl-sgh.de) - - Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser - Datei nur in Übereinstimmung mit der Lizenz erlaubt. - Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. - - Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, - erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE - GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. - - Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen - im Rahmen der Lizenz finden Sie in der Lizenz. - - Autor: Kai Huebl (kai@huebl-sgh.de) - */ -#include "OpcUaStackCore/Base/Log.h" -#include "OpcUaStackPubSub/Events/NetworkRecvEvent.h" -#include "OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h" - -using namespace OpcUaStackCore; - -namespace OpcUaStackPubSub -{ - - MsgTransSubscriber::MsgTransSubscriber( - const std::string& serviceName, - OpcUaStackCore::IOThread::SPtr& ioThread, - OpcUaStackCore::MessageBus::SPtr& messageBus - ) - : ServerServiceBase() - { - // set parameter - ioThread_ = ioThread; - - // set parameter in server service base - serviceName_ = serviceName; - ServerServiceBase::ioThread_ = ioThread.get(); - strand_ = ioThread->createStrand(); - messageBus_ = messageBus; - - // register message bus receiver - MessageBusMemberConfig messageBusMemberConfig; - messageBusMemberConfig.strand(strand_); - messageBusMember_ = messageBus_->registerMember(serviceName_, messageBusMemberConfig); - - // activate receiver - activateReceiver( - [this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) { - // receive message from internal message bus - - auto event = boost::static_pointer_cast(message); - switch (event->eventType()) - { - case EventType::NetworkRecvEvent: - { - NetworkRecvEvent::SPtr event = boost::static_pointer_cast(message); - // FIXME: todo - break; - } - default: - { - Log(Error, "invalid message received in message transport module") - .parameter("ServiceName", serviceName_) - .parameter("Event", (uint32_t)event->eventType()); - } - } - } - ); - } - - MsgTransSubscriber::~MsgTransSubscriber(void) - { - // deactivate receiver - deactivateReceiver(); - messageBus_->deregisterMember(messageBusMember_); - } - - bool - MsgTransSubscriber::startup(void) - { - // FIXME: todo - return true; - } - - bool - MsgTransSubscriber::shutdown(void) - { - // FIXME: todo - return true; - } - - bool - MsgTransSubscriber::registerNetworkMessageProcessor( - uint32_t publisherId, // publisher id - const std::string& networkMessageProcessorName // message bus member name - ) - { - // FIXME: todo - return true; - } - - bool - MsgTransSubscriber::deregisterNetworkMessageProcessor( - uint32_t publisherId - ) - { - // FIXME: todo - return true; - } - -} diff --git a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h b/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h deleted file mode 100644 index 43de15af5..000000000 --- a/src/OpcUaStackPubSub/MessageTransport/MsgTransSubscriber.h +++ /dev/null @@ -1,66 +0,0 @@ -/* - Copyright 2022 Kai Huebl (kai@huebl-sgh.de) - - Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser - Datei nur in Übereinstimmung mit der Lizenz erlaubt. - Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0. - - Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart, - erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE - GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend. - - Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen - im Rahmen der Lizenz finden Sie in der Lizenz. - - Autor: Kai Huebl (kai@huebl-sgh.de) - - DESCRIPTION: - - The message transport subscriber module receives a message from the communication - module via the internal message bus.The publisher ID is determined from the received - message and the assigned network message processor is determined. The received message - is forwarded to the assigned network message processor module via the internal message - bus. - - */ - -#ifndef __OpcUaStackPubSub_MsgTransSubscriber_h__ -#define __OpcUaStackPubSub_MsgTransSubscriber_h__ - -#include "OpcUaStackServer/ServiceSet/ServerServiceBase.h" - -namespace OpcUaStackPubSub -{ - - class DLLEXPORT MsgTransSubscriber - : public OpcUaStackCore::Object - , public OpcUaStackServer::ServerServiceBase - { - public: - using SPtr = boost::shared_ptr; - - MsgTransSubscriber( - const std::string& serviceName, - OpcUaStackCore::IOThread::SPtr& ioThread, - OpcUaStackCore::MessageBus::SPtr& messageBus - ); - ~MsgTransSubscriber(void); - - bool startup(void); - bool shutdown(void); - - bool registerNetworkMessageProcessor( - uint32_t publisherId, // publisher id - const std::string& networkMessageProcessorName // message bus member name - ); - bool deregisterNetworkMessageProcessor( - uint32_t publisherId - ); - - private: - OpcUaStackCore::IOThread::SPtr ioThread_; // smart pointer to io thread - }; - -} - -#endif