Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions SilKit/IntegrationTests/ITest_Internals_DataPubSub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -805,4 +805,83 @@ TEST_F(ITest_Internals_DataPubSub, test_1pub_1sub_async_rejoin)
ShutdownSystem();
}


// Two publishers (optional label1), one subscriber (optional label1, label2); publishers start first; subscriber joins
TEST_F(ITest_Internals_DataPubSub, test_2pub_1sub_async_starting_order)
{
const uint32_t numMsgToPublish = 1;
const uint32_t numMsgToReceive = 1 * numMsgToPublish;

std::vector<PubSubParticipant> publishers;
publishers.push_back({"Pub1",
{{"PubCtrl1",
"TopicA",
{"A"},
{{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional}},
1,
defaultMsgSize,
numMsgToPublish}},
{}});
publishers.push_back({"Pub2",
{{"PubCtrl1",
"TopicA",
{"A"},
{{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional}},
1,
defaultMsgSize,
numMsgToPublish}},
{}});

std::vector<PubSubParticipant> subscribers;
std::vector<std::vector<uint8_t>> expectedDataUnordered;
expectedDataUnordered.reserve(numMsgToReceive);
for (uint32_t d = 0; d < numMsgToReceive; d++)
{
// Receive the same blob several times (once from every publisher)
expectedDataUnordered.emplace_back(std::vector<uint8_t>(defaultMsgSize, 0));
}
subscribers.push_back(
{"Sub1",
{},
{{
"SubCtrl1",
"TopicA",
{"A"},
{{"K1", "V1", SilKit::Services::MatchingLabel::Kind::Optional},
{"K2", "V2", SilKit::Services::MatchingLabel::Kind::Optional}
}, // BUGHUNT: Second label breaks communication
defaultMsgSize,
numMsgToReceive,
1,
expectedDataUnordered,
}}});

for (auto& sub : subscribers)
{
sub.communicationTimeout = std::chrono::milliseconds(1000);
}

_testSystem.SetupRegistryAndSystemMaster("silkit://localhost:0", false, {});


//BUGHUNT: Subscribers start first fails SOMETIMES
//RunParticipants(subscribers, _testSystem.GetRegistryUri(), false);

//BUGHUNT: Publishers start first fails ALWAYS

// Start publishers
RunParticipants(publishers, _testSystem.GetRegistryUri(), false);
for (auto& p : publishers)
{
p.WaitForAllSent();
}

// Start subscriber
RunParticipants(subscribers, _testSystem.GetRegistryUri(), false);


JoinPubSubThreads();
ShutdownSystem();
}

} // anonymous namespace
9 changes: 5 additions & 4 deletions SilKit/IntegrationTests/ITest_Internals_DataPubSub.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class ITest_Internals_DataPubSub : public testing::Test
, name{newName}
, dataSubscribers{newDataSubscribers}
, dataPublishers{newDataPublishers}
, allReceived{std::make_unique<std::atomic<bool>>(false)}
{
}

Expand All @@ -186,6 +187,7 @@ class ITest_Internals_DataPubSub : public testing::Test
std::string name;
std::vector<DataSubscriberInfo> dataSubscribers;
std::vector<DataPublisherInfo> dataPublishers;
std::unique_ptr<std::atomic<bool>> allReceived;
std::unique_ptr<SilKit::IParticipant> participant;
SilKit::Core::IParticipantInternal* participantImpl = nullptr;

Expand All @@ -196,7 +198,6 @@ class ITest_Internals_DataPubSub : public testing::Test
std::promise<void> allDiscoveredPromise;
bool allDiscovered{false};
std::promise<void> allReceivedPromise;
bool allReceived{false};
// Pub
std::promise<void> allSentPromise;
bool allSent{false};
Expand All @@ -208,7 +209,7 @@ class ITest_Internals_DataPubSub : public testing::Test
if (std::all_of(dataSubscribers.begin(), dataSubscribers.end(),
[](const auto& dsInfo) { return dsInfo.numMsgToReceive == 0; }))
{
allReceived = true;
*allReceived = true;
allReceivedPromise.set_value();
}
}
Expand All @@ -224,11 +225,11 @@ class ITest_Internals_DataPubSub : public testing::Test

void CheckAllReceivedPromise()
{
if (!allReceived && std::all_of(dataSubscribers.begin(), dataSubscribers.end(), [](const auto& dsInfo) {
if (!*allReceived && std::all_of(dataSubscribers.begin(), dataSubscribers.end(), [](const auto& dsInfo) {
return dsInfo.allReceived;
}))
{
allReceived = true;
*allReceived = true;
allReceivedPromise.set_value();
}
}
Expand Down
7 changes: 5 additions & 2 deletions SilKit/IntegrationTests/IntegrationTestInfrastructure.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ class TestInfrastructure
{
std::stringstream ss;
ss << "Something went wrong: " << error.what() << std::endl;
_systemMaster.systemController->AbortSimulation();
if (_systemMaster.systemController)
{
_systemMaster.systemController->AbortSimulation();
}
FAIL() << ss.str();
}

Expand Down Expand Up @@ -127,7 +130,7 @@ class TestInfrastructure
struct SystemMaster
{
std::unique_ptr<IParticipant> participant;
SilKit::Experimental::Services::Orchestration::ISystemController* systemController;
SilKit::Experimental::Services::Orchestration::ISystemController* systemController{nullptr};
ISystemMonitor* systemMonitor;
ILifecycleService* lifecycleService;

Expand Down
6 changes: 6 additions & 0 deletions SilKit/include/silkit/services/datatypes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <cstdint>
#include <string>
#include <tuple>

#include "silkit/util/HandlerId.hpp"

Expand Down Expand Up @@ -44,6 +45,11 @@ struct MatchingLabel
std::string key; //!< The label's key.
std::string value; //!< The label's key.
Kind kind; //!< The matching kind to apply for this label.

friend bool operator==(const MatchingLabel& lhs, const MatchingLabel& rhs) noexcept
{
return std::tie(lhs.key, lhs.value, lhs.kind) == std::tie(rhs.key, rhs.value, rhs.kind);
}
};

using SilKit::Util::HandlerId;
Expand Down
12 changes: 12 additions & 0 deletions SilKit/source/core/internal/ServiceDescriptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class ServiceDescriptor

inline bool GetSupplementalDataItem(const std::string& key, std::string& value) const;
inline void SetSupplementalDataItem(std::string key, std::string val);
inline std::string getVal(const std::string& key) const;

inline auto GetSimulationName() const -> const std::string&;
inline void SetSimulationName(const std::string& simulationName);
Expand Down Expand Up @@ -137,6 +138,17 @@ void ServiceDescriptor::SetSupplementalDataItem(std::string key, std::string val
_supplementalData[key] = std::move(val);
}

std::string ServiceDescriptor::getVal(const std::string& key) const
{
std::string tmp;
if(GetSupplementalDataItem(key, tmp) == false)
{
throw SilKit::StateError{"Unknown key in supplementalData"};
}

return tmp;
}

auto ServiceDescriptor::GetParticipantId() const -> ParticipantId
{
return _participantId;
Expand Down
Loading
Loading