diff --git a/SilKit/IntegrationTests/ITest_AsyncSimTask.cpp b/SilKit/IntegrationTests/ITest_AsyncSimTask.cpp index e97fb04e0..589293df2 100644 --- a/SilKit/IntegrationTests/ITest_AsyncSimTask.cpp +++ b/SilKit/IntegrationTests/ITest_AsyncSimTask.cpp @@ -362,4 +362,126 @@ TEST(ITest_AsyncSimTask, test_async_simtask_other_simulation_steps_completed_han cd.AssertTimestampsAlwaysOutsideStep(stepLastOpen); } +// Verify timestamp behavior from the synchronized participant perspective: +// - Sync sender: outgoing event carries virtual send time. +// - Async sender: outgoing events will have an invalid timestamp +// - Sync receiver: incoming event from async or sync sender is timestamped with sync receive time. +// - Async receiver: incoming event from async or sync sender keeps the timestamp (async sender: invalid timestamp, sync sender: send time) +TEST(ITest_AsyncSimTask, test_timestamp_handling) +{ + SimTestHarness testHarness({"Sync"}, "silkit://localhost:0", false, false, {"Async", "Async2"}); + + const auto dataSpecSyncToAsync = SilKit::Services::PubSub::PubSubSpec{"TopicSyncToAsync", "A"}; + const auto dataSpecAsyncToSync = SilKit::Services::PubSub::PubSubSpec{"TopicAsyncToSync", "A"}; + const auto dataSpecAsyncToAsync = SilKit::Services::PubSub::PubSubSpec{"TopicAsyncToAsync", "A"}; + const auto dataSpecSyncToSync = SilKit::Services::PubSub::PubSubSpec{"TopicSyncToSync", "A"}; + + auto* syncParticipant = testHarness.GetParticipant("Sync"); + auto* asyncParticipant = testHarness.GetParticipant("Async"); + auto* async2Participant = testHarness.GetParticipant("Async2"); + + auto* syncLifecycleService = syncParticipant->GetOrCreateLifecycleService(); + auto* asyncLifecycleService = asyncParticipant->GetOrCreateLifecycleService(); + auto* async2LifecycleService = async2Participant->GetOrCreateLifecycleService(); + auto* syncTimeSyncService = syncParticipant->GetOrCreateTimeSyncService(); + + auto* syncPublisher = syncParticipant->Participant()->CreateDataPublisher("SyncPub", dataSpecSyncToAsync); + auto* asyncPublisher = asyncParticipant->Participant()->CreateDataPublisher("AsyncPub", dataSpecAsyncToSync); + auto* asyncPublisherToAsync2 = asyncParticipant->Participant()->CreateDataPublisher("AsyncPubToAsync2", dataSpecAsyncToAsync); + auto* syncPublisherToSync = syncParticipant->Participant()->CreateDataPublisher("SyncPubToSync", dataSpecSyncToSync); + + std::atomic syncSendTimeNs{std::chrono::nanoseconds::min().count()}; + std::atomic syncCurrentStepNs{std::chrono::nanoseconds::min().count()}; + std::atomic syncToAsyncReceived{false}; + std::atomic asyncToSyncReceived{false}; + std::atomic asyncToAsyncReceived{false}; + std::atomic syncToSyncReceived{false}; + std::atomic syncSent{false}; + + asyncParticipant->Participant()->CreateDataSubscriber( + "AsyncSub", dataSpecSyncToAsync, + [asyncPublisher, asyncPublisherToAsync2, &syncSendTimeNs, + &syncToAsyncReceived](SilKit::Services::PubSub::IDataSubscriber* /*subscriber*/, + const SilKit::Services::PubSub::DataMessageEvent& dataMessageEvent) { + const auto expectedTimestamp = std::chrono::nanoseconds{syncSendTimeNs.load()}; + EXPECT_NE(expectedTimestamp, std::chrono::nanoseconds::min()); + EXPECT_EQ(dataMessageEvent.timestamp, expectedTimestamp); + + if (!syncToAsyncReceived.exchange(true)) + { + asyncPublisher->Publish(std::vector{0xA5}); + asyncPublisherToAsync2->Publish(std::vector{0x3C}); + } + }); + + syncParticipant->Participant()->CreateDataSubscriber( + "SyncSub", dataSpecAsyncToSync, + [&syncCurrentStepNs, &asyncToSyncReceived](SilKit::Services::PubSub::IDataSubscriber* /*subscriber*/, + const SilKit::Services::PubSub::DataMessageEvent& dataMessageEvent) { + const auto currentStep = std::chrono::nanoseconds{syncCurrentStepNs.load()}; + + EXPECT_TRUE((dataMessageEvent.timestamp == currentStep) || (dataMessageEvent.timestamp == currentStep + 1ms) + || (dataMessageEvent.timestamp == currentStep - 1ms)) + << "Incoming async message should be timestamped with the sync receiver time (+/-1ms tolerance)."; + asyncToSyncReceived = true; + }); + + syncParticipant->Participant()->CreateDataSubscriber( + "SyncSubFromSync", dataSpecSyncToSync, + [&syncCurrentStepNs, &syncToSyncReceived](SilKit::Services::PubSub::IDataSubscriber* /*subscriber*/, + const SilKit::Services::PubSub::DataMessageEvent& dataMessageEvent) { + const auto currentStep = std::chrono::nanoseconds{syncCurrentStepNs.load()}; + + EXPECT_TRUE((dataMessageEvent.timestamp == currentStep) || (dataMessageEvent.timestamp == currentStep + 1ms) + || (dataMessageEvent.timestamp == currentStep - 1ms)) + << "Incoming sync message should be timestamped with the sync receiver time (+/-1ms tolerance)."; + syncToSyncReceived = true; + }); + + async2Participant->Participant()->CreateDataSubscriber( + "Async2Sub", dataSpecAsyncToAsync, + [&asyncToAsyncReceived](SilKit::Services::PubSub::IDataSubscriber* /*subscriber*/, + const SilKit::Services::PubSub::DataMessageEvent& dataMessageEvent) { + EXPECT_EQ(dataMessageEvent.timestamp, std::chrono::nanoseconds::min()) + << "Outgoing async messages must have invalid timestamps."; + asyncToAsyncReceived = true; + }); + + syncTimeSyncService->SetSimulationStepHandler( + [syncLifecycleService, asyncLifecycleService, async2LifecycleService, syncPublisher, syncPublisherToSync, + &syncSendTimeNs, &syncCurrentStepNs, &syncSent, &syncToAsyncReceived, &asyncToSyncReceived, + &asyncToAsyncReceived, &syncToSyncReceived](std::chrono::nanoseconds now, + std::chrono::nanoseconds /*duration*/) { + syncCurrentStepNs = now.count(); + + if (!syncSent && now >= 1ms) + { + syncSendTimeNs = now.count(); + syncPublisher->Publish(std::vector{0x5A}); + syncPublisherToSync->Publish(std::vector{0xC3}); + syncSent = true; + } + + if (syncToAsyncReceived && asyncToSyncReceived && asyncToAsyncReceived && syncToSyncReceived) + { + asyncLifecycleService->Stop("Timestamp handling verified"); + async2LifecycleService->Stop("Timestamp handling verified"); + syncLifecycleService->Stop("Timestamp handling verified"); + } + else if (now >= 20ms) + { + asyncLifecycleService->Stop("Timestamp handling timed out"); + async2LifecycleService->Stop("Timestamp handling timed out"); + syncLifecycleService->Stop("Timestamp handling timed out"); + } + }, + 1ms); + + ASSERT_TRUE(testHarness.Run(5s)); + ASSERT_TRUE(syncToAsyncReceived.load()); + ASSERT_TRUE(asyncToSyncReceived.load()); + ASSERT_TRUE(asyncToAsyncReceived.load()); + ASSERT_TRUE(syncToSyncReceived.load()); +} + } // anonymous namespace