From 28fbef9380a8e0c27e1f6056e43e9fb0d251d98e Mon Sep 17 00:00:00 2001 From: Konrad Breitsprecher Date: Fri, 26 Jun 2026 15:04:09 +0200 Subject: [PATCH] Add initial timecoordinator Itests Signed-off-by: Konrad Breitsprecher --- SilKit/IntegrationTests/CMakeLists.txt | 4 + .../ITest_TimeCoordinator.cpp | 427 ++++++++++++++++++ .../SimTestHarness/SimTestHarness.cpp | 18 +- .../SimTestHarness/SimTestHarness.hpp | 6 +- 4 files changed, 447 insertions(+), 8 deletions(-) create mode 100644 SilKit/IntegrationTests/ITest_TimeCoordinator.cpp diff --git a/SilKit/IntegrationTests/CMakeLists.txt b/SilKit/IntegrationTests/CMakeLists.txt index 0eb1b3a59..96934e967 100644 --- a/SilKit/IntegrationTests/CMakeLists.txt +++ b/SilKit/IntegrationTests/CMakeLists.txt @@ -230,6 +230,10 @@ add_silkit_test_to_executable(SilKitIntegrationTests SOURCES ITest_Orchestration.cpp ) +add_silkit_test_to_executable(SilKitIntegrationTests + SOURCES ITest_TimeCoordinator.cpp +) + add_silkit_test_to_executable(SilKitIntegrationTests SOURCES ITest_MessageAggregation.cpp ) diff --git a/SilKit/IntegrationTests/ITest_TimeCoordinator.cpp b/SilKit/IntegrationTests/ITest_TimeCoordinator.cpp new file mode 100644 index 000000000..908b941c9 --- /dev/null +++ b/SilKit/IntegrationTests/ITest_TimeCoordinator.cpp @@ -0,0 +1,427 @@ +// SPDX-FileCopyrightText: 2026 Vector Informatik GmbH +// +// SPDX-License-Identifier: MIT + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ITestFixture.hpp" +#include "silkit/services/pubsub/all.hpp" + +#include "gtest/gtest.h" + +namespace { + +using namespace std::chrono_literals; +using namespace SilKit::Services::PubSub; +using namespace SilKit::Tests; + +inline std::ostream& operator<<(std::ostream& out, std::chrono::nanoseconds timestamp) +{ + using namespace std::chrono_literals; + + if (timestamp % 1ms == 0ns) + { + out << (timestamp / 1ms) << "ms"; + } + else if (timestamp % 1us == 0ns) + { + out << (timestamp / 1us) << "us"; + } + else + { + out << timestamp.count() << "ns"; + } + return out; +} + +static auto ToReadableList(const std::vector& values) -> std::string +{ + std::ostringstream out; + out << "[ "; + for (size_t i = 0; i < values.size(); ++i) + { + if (i > 0) + { + out << ", "; + } + out << values[i]; + } + out << " ]"; + return out.str(); +} + +static auto ToReadableTimestamp(bool valid, std::chrono::nanoseconds value) -> std::string +{ + if (!valid) + { + return ""; + } + + std::ostringstream out; + out << value; + return out.str(); +} + +static auto MissingFromActual(const std::vector& expected, + const std::vector& actual) + -> std::vector +{ + std::vector missing; + missing.reserve(expected.size()); + + auto itActual = actual.begin(); + for (const auto expectedValue : expected) + { + itActual = std::find(itActual, actual.end(), expectedValue); + if (itActual == actual.end()) + { + missing.emplace_back(expectedValue); + } + else + { + ++itActual; + } + } + + return missing; +} + +struct ITest_TimeCoordinator : ITest_SimTestHarness +{ + using ITest_SimTestHarness::ITest_SimTestHarness; +}; + +struct TimeCoordinatorTestState +{ + std::mutex mx; + std::vector publisherSimTaskNows; + std::vector subscriberSimTaskNows; + std::vector timeCoordinatorSimTaskNows; + + std::atomic messagePublishedAt10ms{false}; + std::atomic messageReceived{false}; + std::atomic messageAvailableAt10msPlus1us{false}; + + std::atomic messageReceivedAtValid{false}; + std::atomic messageReceivedAt{0ns}; + + std::atomic pendingReceive{false}; +}; + +// This test models a future central time-coordinator behavior for two synchronized participants +// (Publisher and Subscriber) with a nominal step size of 1ms. +// +// Expected schedule per cycle around 10ms: +// 0ms, 1ms, ..., 10ms, 10ms + 1us, 11ms, 12ms +// i.e. one temporary slowdown step of 1us is inserted after 10ms. +// +// Communication expectation: +// - Publisher sends one PubSub message at 10ms. +// - Subscriber expects that message to be available at 10ms + 1us. +// +// TimeCoordinator participant: +// - Intends to change step sizes at runtime (1us at 10ms, 999us at 10ms+1us, 1ms at 11ms), +// but these calls are currently missing in the API and therefore represented as commented placeholders. +TEST_F(ITest_TimeCoordinator, test_fixed_schedule_slowdown_by_time_coordinator) +{ + SetupFromParticipantList({"Publisher", "Subscriber", "TimeCoordinator"}); + + auto state = std::make_shared(); + + { + auto&& simParticipant = _simTestHarness->GetParticipant("Publisher", "", false); + auto&& participant = simParticipant->Participant(); + auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService(); + auto&& timeSyncService = lifecycleService->CreateTimeSyncService(); + auto&& publisher = participant->CreateDataPublisher("Pub", PubSubSpec{"TimeCoordinatorTopic", ""}); + + timeSyncService->SetSimulationStepHandler([state, lifecycleService, publisher](auto now, auto) { + { + std::lock_guard lock{state->mx}; + state->publisherSimTaskNows.emplace_back(now); + } + + if (now == 10ms) + { + const std::vector payload{0xAA}; + publisher->Publish(payload); + state->messagePublishedAt10ms = true; + } + + if (now >= 12ms) + { + lifecycleService->Stop("Test complete"); + } + }, 1ms); + } + + { + auto&& simParticipant = _simTestHarness->GetParticipant("Subscriber", "", false); + auto&& participant = simParticipant->Participant(); + auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService(); + auto&& timeSyncService = lifecycleService->CreateTimeSyncService(); + + participant->CreateDataSubscriber( + "Sub", PubSubSpec{"TimeCoordinatorTopic", ""}, + [state](IDataSubscriber*, const DataMessageEvent&) { + state->pendingReceive = true; + }); + + timeSyncService->SetSimulationStepHandler([state](auto now, auto) { + { + std::lock_guard lock{state->mx}; + state->subscriberSimTaskNows.emplace_back(now); + } + + if (state->pendingReceive.exchange(false)) + { + state->messageReceived = true; + if (!state->messageReceivedAtValid.load()) + { + state->messageReceivedAt = now; + state->messageReceivedAtValid = true; + } + } + + if (now == (10ms + 1us)) + { + state->messageAvailableAt10msPlus1us = state->messageReceived.load(); + } + }, 1ms); + } + + { + auto&& simParticipant = _simTestHarness->GetParticipant("TimeCoordinator", "", false); + auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService(); + auto&& timeSyncService = lifecycleService->CreateTimeSyncService(); + + timeSyncService->SetSimulationStepHandler([state](auto now, auto) { + { + std::lock_guard lock{state->mx}; + state->timeCoordinatorSimTaskNows.emplace_back(now); + } + + if (now == 10ms) + { + // timeSyncService->SetStepSize(1us); + } + if (now == (10ms + 1us)) + { + // timeSyncService->SetStepSize(999us); + } + if (now == 11ms) + { + // timeSyncService->SetStepSize(1ms); + } + }, 1ms); + } + + const auto ok = _simTestHarness->Run(5s); + ASSERT_TRUE(ok) << "SimTestHarness should terminate without timeout"; + + std::vector publisherNows; + std::vector subscriberNows; + std::vector timeCoordinatorNows; + { + std::lock_guard lock{state->mx}; + publisherNows = state->publisherSimTaskNows; + subscriberNows = state->subscriberSimTaskNows; + timeCoordinatorNows = state->timeCoordinatorSimTaskNows; + } + + const std::vector expectedNows{ + 0ms, + 1ms, + 2ms, + 3ms, + 4ms, + 5ms, + 6ms, + 7ms, + 8ms, + 9ms, + 10ms, + 10ms + 1us, + 11ms, + 12ms, + }; + + if (publisherNows != expectedNows) + { + ADD_FAILURE() << "Publisher schedule mismatch\n" + << "Expected: " << ToReadableList(expectedNows) << "\n" + << "Actual : " << ToReadableList(publisherNows) << "\n" + << "Missing : " << ToReadableList(MissingFromActual(expectedNows, publisherNows)); + } + + if (subscriberNows != expectedNows) + { + ADD_FAILURE() << "Subscriber schedule mismatch\n" + << "Expected: " << ToReadableList(expectedNows) << "\n" + << "Actual : " << ToReadableList(subscriberNows) << "\n" + << "Missing : " << ToReadableList(MissingFromActual(expectedNows, subscriberNows)); + } + + if (timeCoordinatorNows != expectedNows) + { + ADD_FAILURE() << "TimeCoordinator schedule mismatch\n" + << "Expected: " << ToReadableList(expectedNows) << "\n" + << "Actual : " << ToReadableList(timeCoordinatorNows) << "\n" + << "Missing : " << ToReadableList(MissingFromActual(expectedNows, timeCoordinatorNows)); + } + + EXPECT_TRUE(state->messagePublishedAt10ms.load()); + EXPECT_TRUE(state->messageReceived.load()); + EXPECT_TRUE(state->messageAvailableAt10msPlus1us.load()) + << "Message was not available at 10ms+1us as expected.\n" + << "Published@10ms : " << std::boolalpha << state->messagePublishedAt10ms.load() << "\n" + << "Received(any time) : " << std::boolalpha << state->messageReceived.load() << "\n" + << "ReceivedAt : " + << ToReadableTimestamp(state->messageReceivedAtValid.load(), state->messageReceivedAt.load()) << "\n" + << "Publisher schedule : " << ToReadableList(publisherNows) << "\n" + << "Subscriber schedule : " << ToReadableList(subscriberNows); +} +// Same as test_fixed_schedule_slowdown_by_time_coordinator, +// but the step size change is requested by the Publisher participant instead of a dedicated TimeCoordinator participant. +TEST_F(ITest_TimeCoordinator, test_fixed_schedule_slowdown_by_participant) +{ + SetupFromParticipantList({"Publisher", "Subscriber"}); + + auto state = std::make_shared(); + + { + auto&& simParticipant = _simTestHarness->GetParticipant("Publisher", "", false); + auto&& participant = simParticipant->Participant(); + auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService(); + auto&& timeSyncService = lifecycleService->CreateTimeSyncService(); + auto&& publisher = participant->CreateDataPublisher("Pub", PubSubSpec{"TimeCoordinatorTopic", ""}); + + timeSyncService->SetSimulationStepHandler([state, lifecycleService, publisher](auto now, auto) { + { + std::lock_guard lock{state->mx}; + state->publisherSimTaskNows.emplace_back(now); + } + + if (now == 10ms) + { + const std::vector payload{0xAA}; + publisher->Publish(payload); + state->messagePublishedAt10ms = true; + + // timeSyncService->SetStepSize(1us); + } + if (now == (10ms + 1us)) + { + // timeSyncService->SetStepSize(999us); + } + if (now == 11ms) + { + // timeSyncService->SetStepSize(1ms); + } + + if (now >= 12ms) + { + lifecycleService->Stop("Test complete"); + } + }, 1ms); + } + + { + auto&& simParticipant = _simTestHarness->GetParticipant("Subscriber", "", false); + auto&& participant = simParticipant->Participant(); + auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService(); + auto&& timeSyncService = lifecycleService->CreateTimeSyncService(); + + participant->CreateDataSubscriber( + "Sub", PubSubSpec{"TimeCoordinatorTopic", ""}, + [state](IDataSubscriber*, const DataMessageEvent&) { + state->pendingReceive = true; + }); + + timeSyncService->SetSimulationStepHandler([state](auto now, auto) { + { + std::lock_guard lock{state->mx}; + state->subscriberSimTaskNows.emplace_back(now); + } + + if (state->pendingReceive.exchange(false)) + { + state->messageReceived = true; + if (!state->messageReceivedAtValid.load()) + { + state->messageReceivedAt = now; + state->messageReceivedAtValid = true; + } + } + + if (now == (10ms + 1us)) + { + state->messageAvailableAt10msPlus1us = state->messageReceived.load(); + } + }, 1ms); + } + + const auto ok = _simTestHarness->Run(5s); + ASSERT_TRUE(ok) << "SimTestHarness should terminate without timeout"; + + std::vector publisherNows; + std::vector subscriberNows; + { + std::lock_guard lock{state->mx}; + publisherNows = state->publisherSimTaskNows; + subscriberNows = state->subscriberSimTaskNows; + } + + const std::vector expectedNows{ + 0ms, + 1ms, + 2ms, + 3ms, + 4ms, + 5ms, + 6ms, + 7ms, + 8ms, + 9ms, + 10ms, + 10ms + 1us, + 11ms, + 12ms, + }; + + if (publisherNows != expectedNows) + { + ADD_FAILURE() << "Publisher schedule mismatch\n" + << "Expected: " << ToReadableList(expectedNows) << "\n" + << "Actual : " << ToReadableList(publisherNows) << "\n" + << "Missing : " << ToReadableList(MissingFromActual(expectedNows, publisherNows)); + } + + if (subscriberNows != expectedNows) + { + ADD_FAILURE() << "Subscriber schedule mismatch\n" + << "Expected: " << ToReadableList(expectedNows) << "\n" + << "Actual : " << ToReadableList(subscriberNows) << "\n" + << "Missing : " << ToReadableList(MissingFromActual(expectedNows, subscriberNows)); + } + + EXPECT_TRUE(state->messagePublishedAt10ms.load()); + EXPECT_TRUE(state->messageReceived.load()); + EXPECT_TRUE(state->messageAvailableAt10msPlus1us.load()) + << "Message was not available at 10ms+1us as expected.\n" + << "Published@10ms : " << std::boolalpha << state->messagePublishedAt10ms.load() << "\n" + << "Received(any time) : " << std::boolalpha << state->messageReceived.load() << "\n" + << "ReceivedAt : " + << ToReadableTimestamp(state->messageReceivedAtValid.load(), state->messageReceivedAt.load()) << "\n" + << "Publisher schedule : " << ToReadableList(publisherNows) << "\n" + << "Subscriber schedule : " << ToReadableList(subscriberNows); +} + +} // namespace diff --git a/SilKit/IntegrationTests/SimTestHarness/SimTestHarness.cpp b/SilKit/IntegrationTests/SimTestHarness/SimTestHarness.cpp index 134a73534..5ea0d8a13 100644 --- a/SilKit/IntegrationTests/SimTestHarness/SimTestHarness.cpp +++ b/SilKit/IntegrationTests/SimTestHarness/SimTestHarness.cpp @@ -256,7 +256,8 @@ bool SimTestHarness::Run(std::chrono::nanoseconds testRunTimeout, const std::vec } SimParticipant* SimTestHarness::GetParticipant(const std::string& participantName, - const std::string& participantConfiguration) + const std::string& participantConfiguration, + bool createDefaultTimeSyncService) { auto lock = Lock(); if (_simParticipants.count(participantName) == 0) @@ -271,12 +272,15 @@ SimParticipant* SimTestHarness::GetParticipant(const std::string& participantNam else { AddParticipant(participantName, participantConfiguration, - {SilKit::Services::Orchestration::OperationMode::Autonomous}); + {SilKit::Services::Orchestration::OperationMode::Autonomous}, + createDefaultTimeSyncService); } } else { - AddParticipant(participantName, participantConfiguration); + AddParticipant(participantName, participantConfiguration, + {SilKit::Services::Orchestration::OperationMode::Coordinated}, + createDefaultTimeSyncService); } } return _simParticipants[participantName].get(); @@ -289,11 +293,12 @@ auto SimTestHarness::GetRegistryUri() const -> std::string SimParticipant* SimTestHarness::GetParticipant(const std::string& participantName) { - return GetParticipant(participantName, ""); + return GetParticipant(participantName, "", true); } void SimTestHarness::AddParticipant(const std::string& participantName, const std::string& participantConfiguration, - SilKit::Services::Orchestration::LifecycleConfiguration startConfiguration) + SilKit::Services::Orchestration::LifecycleConfiguration startConfiguration, + bool createDefaultTimeSyncService) { auto participant = std::make_unique(); participant->_name = participantName; @@ -304,7 +309,8 @@ void SimTestHarness::AddParticipant(const std::string& participantName, const st // mandatory sim task for time synced simulation // by default, we do no operation during simulation task, the user should override this auto* lifecycleService = participant->GetOrCreateLifecycleService(startConfiguration); - if (startConfiguration.operationMode == SilKit::Services::Orchestration::OperationMode::Coordinated) + if (createDefaultTimeSyncService + && startConfiguration.operationMode == SilKit::Services::Orchestration::OperationMode::Coordinated) { auto* timeSyncService = participant->GetOrCreateTimeSyncService(); timeSyncService->SetSimulationStepHandler([](auto, auto) {}, 1ms); diff --git a/SilKit/IntegrationTests/SimTestHarness/SimTestHarness.hpp b/SilKit/IntegrationTests/SimTestHarness/SimTestHarness.hpp index ef3f1a55a..38c72e598 100644 --- a/SilKit/IntegrationTests/SimTestHarness/SimTestHarness.hpp +++ b/SilKit/IntegrationTests/SimTestHarness/SimTestHarness.hpp @@ -114,7 +114,8 @@ class SimTestHarness //! \brief Get the SimParticipant by name SimParticipant* GetParticipant(const std::string& participantName); //! \brief Get the SimParticipant by name. If it does not exist yet, create a SimParticipant with the specified name and provide its ParticipantConfiguration as a string. - SimParticipant* GetParticipant(const std::string& participantName, const std::string& participantConfiguration); + SimParticipant* GetParticipant(const std::string& participantName, const std::string& participantConfiguration, + bool createDefaultTimeSyncService = true); auto GetRegistryUri() const -> std::string; auto GetRegistry() const -> SilKit::Vendor::Vector::ISilKitRegistry* @@ -130,7 +131,8 @@ class SimTestHarness private: void AddParticipant(const std::string& participantName, const std::string& participantConfiguration, SilKit::Services::Orchestration::LifecycleConfiguration startConfiguration = { - SilKit::Services::Orchestration::OperationMode::Coordinated}); + SilKit::Services::Orchestration::OperationMode::Coordinated}, + bool createDefaultTimeSyncService = true); bool IsSync(const std::string& participantName); bool IsAsync(const std::string& participantName);