Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions SilKit/IntegrationTests/ITest_AsyncSimTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,4 +362,126 @@ TEST(ITest_AsyncSimTask, test_async_simtask_other_simulation_steps_completed_han
cd.AssertTimestampsAlwaysOutsideStep(stepLastOpen);
}

// Verify timestamp behavior from the synchronized participant perspective:
// - Sync sender: outgoing event carries virtual send time.
// - Async sender: outgoing events will have an invalid timestamp
// - Sync receiver: incoming event from async or sync sender is timestamped with sync receive time.
// - Async receiver: incoming event from async or sync sender keeps the timestamp (async sender: invalid timestamp, sync sender: send time)
TEST(ITest_AsyncSimTask, test_timestamp_handling)
{
SimTestHarness testHarness({"Sync"}, "silkit://localhost:0", false, false, {"Async", "Async2"});

const auto dataSpecSyncToAsync = SilKit::Services::PubSub::PubSubSpec{"TopicSyncToAsync", "A"};
const auto dataSpecAsyncToSync = SilKit::Services::PubSub::PubSubSpec{"TopicAsyncToSync", "A"};
const auto dataSpecAsyncToAsync = SilKit::Services::PubSub::PubSubSpec{"TopicAsyncToAsync", "A"};
const auto dataSpecSyncToSync = SilKit::Services::PubSub::PubSubSpec{"TopicSyncToSync", "A"};

auto* syncParticipant = testHarness.GetParticipant("Sync");
auto* asyncParticipant = testHarness.GetParticipant("Async");
auto* async2Participant = testHarness.GetParticipant("Async2");

auto* syncLifecycleService = syncParticipant->GetOrCreateLifecycleService();
auto* asyncLifecycleService = asyncParticipant->GetOrCreateLifecycleService();
auto* async2LifecycleService = async2Participant->GetOrCreateLifecycleService();
auto* syncTimeSyncService = syncParticipant->GetOrCreateTimeSyncService();

auto* syncPublisher = syncParticipant->Participant()->CreateDataPublisher("SyncPub", dataSpecSyncToAsync);
auto* asyncPublisher = asyncParticipant->Participant()->CreateDataPublisher("AsyncPub", dataSpecAsyncToSync);
auto* asyncPublisherToAsync2 = asyncParticipant->Participant()->CreateDataPublisher("AsyncPubToAsync2", dataSpecAsyncToAsync);
auto* syncPublisherToSync = syncParticipant->Participant()->CreateDataPublisher("SyncPubToSync", dataSpecSyncToSync);

std::atomic<std::chrono::nanoseconds::rep> syncSendTimeNs{std::chrono::nanoseconds::min().count()};
std::atomic<std::chrono::nanoseconds::rep> syncCurrentStepNs{std::chrono::nanoseconds::min().count()};
std::atomic<bool> syncToAsyncReceived{false};
std::atomic<bool> asyncToSyncReceived{false};
std::atomic<bool> asyncToAsyncReceived{false};
std::atomic<bool> syncToSyncReceived{false};
std::atomic<bool> syncSent{false};

asyncParticipant->Participant()->CreateDataSubscriber(
"AsyncSub", dataSpecSyncToAsync,
[asyncPublisher, asyncPublisherToAsync2, &syncSendTimeNs,
&syncToAsyncReceived](SilKit::Services::PubSub::IDataSubscriber* /*subscriber*/,
const SilKit::Services::PubSub::DataMessageEvent& dataMessageEvent) {
const auto expectedTimestamp = std::chrono::nanoseconds{syncSendTimeNs.load()};
EXPECT_NE(expectedTimestamp, std::chrono::nanoseconds::min());
EXPECT_EQ(dataMessageEvent.timestamp, expectedTimestamp);

if (!syncToAsyncReceived.exchange(true))
{
asyncPublisher->Publish(std::vector<uint8_t>{0xA5});
asyncPublisherToAsync2->Publish(std::vector<uint8_t>{0x3C});
}
});

syncParticipant->Participant()->CreateDataSubscriber(
"SyncSub", dataSpecAsyncToSync,
[&syncCurrentStepNs, &asyncToSyncReceived](SilKit::Services::PubSub::IDataSubscriber* /*subscriber*/,
const SilKit::Services::PubSub::DataMessageEvent& dataMessageEvent) {
const auto currentStep = std::chrono::nanoseconds{syncCurrentStepNs.load()};

EXPECT_TRUE((dataMessageEvent.timestamp == currentStep) || (dataMessageEvent.timestamp == currentStep + 1ms)
|| (dataMessageEvent.timestamp == currentStep - 1ms))
<< "Incoming async message should be timestamped with the sync receiver time (+/-1ms tolerance).";
asyncToSyncReceived = true;
});

syncParticipant->Participant()->CreateDataSubscriber(
"SyncSubFromSync", dataSpecSyncToSync,
[&syncCurrentStepNs, &syncToSyncReceived](SilKit::Services::PubSub::IDataSubscriber* /*subscriber*/,
const SilKit::Services::PubSub::DataMessageEvent& dataMessageEvent) {
const auto currentStep = std::chrono::nanoseconds{syncCurrentStepNs.load()};

EXPECT_TRUE((dataMessageEvent.timestamp == currentStep) || (dataMessageEvent.timestamp == currentStep + 1ms)
|| (dataMessageEvent.timestamp == currentStep - 1ms))
<< "Incoming sync message should be timestamped with the sync receiver time (+/-1ms tolerance).";
syncToSyncReceived = true;
});

async2Participant->Participant()->CreateDataSubscriber(
"Async2Sub", dataSpecAsyncToAsync,
[&asyncToAsyncReceived](SilKit::Services::PubSub::IDataSubscriber* /*subscriber*/,
const SilKit::Services::PubSub::DataMessageEvent& dataMessageEvent) {
EXPECT_EQ(dataMessageEvent.timestamp, std::chrono::nanoseconds::min())
<< "Outgoing async messages must have invalid timestamps.";
asyncToAsyncReceived = true;
});

syncTimeSyncService->SetSimulationStepHandler(
[syncLifecycleService, asyncLifecycleService, async2LifecycleService, syncPublisher, syncPublisherToSync,
&syncSendTimeNs, &syncCurrentStepNs, &syncSent, &syncToAsyncReceived, &asyncToSyncReceived,
&asyncToAsyncReceived, &syncToSyncReceived](std::chrono::nanoseconds now,
std::chrono::nanoseconds /*duration*/) {
syncCurrentStepNs = now.count();

if (!syncSent && now >= 1ms)
{
syncSendTimeNs = now.count();
syncPublisher->Publish(std::vector<uint8_t>{0x5A});
syncPublisherToSync->Publish(std::vector<uint8_t>{0xC3});
syncSent = true;
}

if (syncToAsyncReceived && asyncToSyncReceived && asyncToAsyncReceived && syncToSyncReceived)
{
asyncLifecycleService->Stop("Timestamp handling verified");
async2LifecycleService->Stop("Timestamp handling verified");
syncLifecycleService->Stop("Timestamp handling verified");
}
else if (now >= 20ms)
{
asyncLifecycleService->Stop("Timestamp handling timed out");
async2LifecycleService->Stop("Timestamp handling timed out");
syncLifecycleService->Stop("Timestamp handling timed out");
}
},
1ms);

ASSERT_TRUE(testHarness.Run(5s));
ASSERT_TRUE(syncToAsyncReceived.load());
ASSERT_TRUE(asyncToSyncReceived.load());
ASSERT_TRUE(asyncToAsyncReceived.load());
ASSERT_TRUE(syncToSyncReceived.load());
}

} // anonymous namespace
Loading