From 13373e31144f4e59023b3ad9155f69f26bbb765b Mon Sep 17 00:00:00 2001 From: Bartosz Burda Date: Thu, 14 May 2026 13:00:43 +0200 Subject: [PATCH] feat(gateway,#380): expose entity context on /faults/stream SSE events Each `GET /api/v1/faults/stream` event payload now carries an optional `x-medkit` SOVD payload-extension object with `entity_type` and `entity_id` fields when the gateway can resolve the fault's first reporting source back to a SOVD entity. Consumers can hit `/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}` directly instead of enumerating apps + components and HEAD-probing each one. Nested under `x-medkit` per the SOVD payload-extension convention (matches `x-medkit.aggregation_level`, `x-medkit.phase`, etc.). Flat `x-medkit-*` names are reserved for endpoint paths (`/x-medkit-graph`) and error codes, not payload fields. Resolution chain (mirrors gateway_node's node_resolver lambda for the manifest path): - Manifest / hybrid: cache's node-to-app linking index, both with and without the leading slash on the FQN. - Runtime fallback: FQN's last segment, validated against `cache.has_app()` so we never point consumers at a 404. - Runtime collision fallback: `_` form (slashes in the namespace replaced with `_`), matching the disambiguation rule in ros2_runtime_introspection.cpp for nodes that share a name across namespaces. - No match: `x-medkit` object omitted entirely; an `RCLCPP_DEBUG` trace records the unresolved source so operators can diagnose consumers stuck on the discovery fallback path. Backward-compatible for existing SOVD consumers. Entity resolution is snapshotted in `on_fault_event` (before the queue lock is acquired) and stored as `std::optional` on the buffered event. A discovery refresh between enqueue and stream-out cannot retroactively flip the entity reported to consumers, and the format path stays lock-free with respect to the entity cache. --- docs/api/rest.rst | 7 +- src/ros2_medkit_gateway/CHANGELOG.rst | 1 + src/ros2_medkit_gateway/README.md | 7 +- .../http/handlers/sse_fault_handler.hpp | 32 ++- .../src/http/handlers/sse_fault_handler.cpp | 100 +++++++-- .../test/test_sse_fault_handler.cpp | 197 ++++++++++++++++++ 6 files changed, 324 insertions(+), 20 deletions(-) diff --git a/docs/api/rest.rst b/docs/api/rest.rst index ee21d7e4..cab8e8d1 100644 --- a/docs/api/rest.rst +++ b/docs/api/rest.rst @@ -2372,7 +2372,12 @@ Other extensions beyond SOVD: - Vendor extension fields using ``x-medkit`` prefix (per SOVD extension mechanism) - ``DELETE /faults`` - Clear all faults globally -- ``GET /faults/stream`` - SSE real-time fault notifications +- ``GET /faults/stream`` - SSE real-time fault notifications. Each event payload carries an + optional ``x-medkit`` SOVD payload-extension object with ``entity_type`` and ``entity_id`` + fields when the gateway can resolve the fault's first reporting source back to an entity, + so consumers can hit ``/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}`` directly + without enumerating entities. Resolution is snapshotted at event arrival; the entire + ``x-medkit`` object is omitted when no entity can be resolved. - ``/health`` - Health check with discovery pipeline diagnostics - ``/version-info`` - Gateway version information - ``/docs`` - OpenAPI capability description diff --git a/src/ros2_medkit_gateway/CHANGELOG.rst b/src/ros2_medkit_gateway/CHANGELOG.rst index 8744997e..63861402 100644 --- a/src/ros2_medkit_gateway/CHANGELOG.rst +++ b/src/ros2_medkit_gateway/CHANGELOG.rst @@ -11,6 +11,7 @@ Unreleased **Features:** +* ``GET /api/v1/faults/stream`` event payloads now carry an optional ``x-medkit`` SOVD payload-extension object with ``entity_type`` and ``entity_id`` fields. When the gateway can resolve the fault's first reporting source back to a SOVD entity (via the manifest-mode linking index, or a runtime-mode last-segment match against an existing App), consumers can hit ``/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}`` directly instead of HEAD-probing every entity. Resolution is snapshotted at event arrival, so a discovery refresh between enqueue and stream-out cannot retroactively change the entity reported to consumers. The ``x-medkit`` object is omitted entirely when no entity can be resolved, so existing SSE consumers ignore the addition (`#380 `_) * Plugin API version bumped to v7. Adds ``PluginContext::notify_entities_changed(EntityChangeScope)`` lifecycle hook for plugins that mutate the entity surface at runtime; default no-op keeps v6 source code compiling unchanged against v7 headers. Binary compatibility is not provided: the plugin loader uses a strict equality check on ``plugin_api_version()``, so out-of-tree plugins must be recompiled (`#376 `_) * New ``discovery.manifest.fragments_dir`` parameter: gateway scans the directory for ``*.yaml`` / ``*.yml`` fragment files on every manifest load / reload and merges apps, components, and functions on top of the base manifest. Fragments are forbidden from declaring top-level ``areas``, ``metadata``, ``discovery``, ``scripts``, ``capabilities``, or ``lock_overrides`` - those stay in the base manifest. Presence of any forbidden key (including empty-valued ones like ``areas: []``) is reported as a ``FRAGMENT_FORBIDDEN_FIELD`` validation error that fails the load / reload. Unknown top-level keys (typos such as ``app:`` vs ``apps:``) are ignored with a warning log. Files merged in alphabetical order for deterministic duplicate-id errors (`#376 `_) * Fragment files are size-capped at 1 MiB (``ManifestParser::kMaxFragmentBytes``) before being read into memory, and any symlink resolving outside the canonical ``fragments_dir`` is skipped with a warning, so misconfigurations or symlink-based escapes cannot hand arbitrary bytes to the YAML parser (`#376 `_) diff --git a/src/ros2_medkit_gateway/README.md b/src/ros2_medkit_gateway/README.md index ff478cab..db7203a1 100644 --- a/src/ros2_medkit_gateway/README.md +++ b/src/ros2_medkit_gateway/README.md @@ -764,6 +764,7 @@ Real-time fault event stream using Server-Sent Events (SSE). Clients receive ins - **Automatic reconnection**: Supports `Last-Event-ID` header for seamless reconnection - **Keepalive**: Sends `:keepalive` comment every 30 seconds to prevent timeouts - **Event buffer**: Buffers up to 100 recent events for reconnecting clients +- **Entity context (SOVD payload extension)**: When the gateway can resolve the fault's first reporting source back to an entity, the payload carries an `x-medkit` object with `entity_type` and `entity_id` fields so consumers can hit `/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}` directly without enumerating entities **Event Types:** - `fault_confirmed` - Fault transitioned to CONFIRMED status @@ -781,13 +782,15 @@ curl -N http://localhost:8080/api/v1/faults/stream id: 1 event: fault_confirmed -data: {"event_type":"fault_confirmed","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830000.123} +data: {"event_type":"fault_confirmed","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830000.123,"x-medkit":{"entity_type":"apps","entity_id":"motor_controller"}} id: 2 event: fault_cleared -data: {"event_type":"fault_cleared","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830060.456} +data: {"event_type":"fault_cleared","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830060.456,"x-medkit":{"entity_type":"apps","entity_id":"motor_controller"}} ``` +**SOVD payload extension `x-medkit.entity_*`** (non-standard, SOVD-compatible): Resolution is best-effort and snapshotted at event arrival. Manifest and hybrid discovery use the linking result; runtime-only discovery falls back to the FQN's last segment and only emits the fields when an App with that ID exists in the cache. When no entity can be resolved (no reporting sources, orphan source, etc.), the entire `x-medkit` object is omitted and the consumer must fall back to discovery. + **Response (503 Service Unavailable - Client Limit Reached):** ```json { diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/sse_fault_handler.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/sse_fault_handler.hpp index 072bb7e6..ccf95100 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/sse_fault_handler.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/http/handlers/sse_fault_handler.hpp @@ -20,8 +20,11 @@ #include #include #include +#include #include +#include + #include "rclcpp/rclcpp.hpp" #include "ros2_medkit_gateway/core/http/sse_client_tracker.hpp" #include "ros2_medkit_gateway/http/handlers/handler_context.hpp" @@ -111,8 +114,33 @@ class SSEFaultHandler { /// Callback for fault events from ROS 2 topic void on_fault_event(const ros2_medkit_msgs::msg::FaultEvent::ConstSharedPtr & msg); + /// Resolved owning entity for a fault. Populates the ``x-medkit`` SOVD + /// payload-extension object on outgoing SSE events. + struct EntityContext { + std::string type; + std::string id; + }; + + /// Buffered queue entry. ``entity`` is resolved at enqueue time so a + /// discovery refresh between enqueue and stream-out cannot retroactively + /// flip the entity reported to consumers. + struct QueuedEvent { + uint64_t id; + ros2_medkit_msgs::msg::FaultEvent event; + std::optional entity; + }; + /// Format a fault event as SSE message - static std::string format_sse_event(const ros2_medkit_msgs::msg::FaultEvent & event, uint64_t event_id); + static std::string format_sse_event(const QueuedEvent & queued); + + /// Resolve the owning entity for a fault, snapshotting the cache. Manifest + /// / hybrid mode uses the cache's node-to-app index; runtime mode falls + /// back to the FQN's last segment, and for collision-disambiguated runtime + /// apps also to ``_`` per + /// ros2_runtime_introspection.cpp's renaming rule. Only emits a value when + /// an App with the resolved id actually exists in the cache - returns + /// ``std::nullopt`` otherwise so the consumer falls back to discovery. + std::optional resolve_entity_context(const ros2_medkit_msgs::msg::Fault & fault) const; HandlerContext & ctx_; std::shared_ptr client_tracker_; @@ -123,7 +151,7 @@ class SSEFaultHandler { /// Event queue for broadcasting to clients mutable std::mutex queue_mutex_; std::condition_variable queue_cv_; - std::deque> event_queue_; + std::deque event_queue_; /// Monotonically increasing event ID for Last-Event-ID support std::atomic next_event_id_{1}; diff --git a/src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp b/src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp index ec724bef..e4a8df61 100644 --- a/src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp +++ b/src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp @@ -86,12 +86,17 @@ void SSEFaultHandler::request_shutdown() { void SSEFaultHandler::on_fault_event(const ros2_medkit_msgs::msg::FaultEvent::ConstSharedPtr & msg) { uint64_t event_id = next_event_id_.fetch_add(1); + // Snapshot entity context before acquiring the queue lock so cache state + // is pinned to the fault arrival timestamp and the formatting path stays + // lock-free with respect to the cache. + auto entity = resolve_entity_context(msg->fault); + std::size_t dropped_this_call = 0; { std::lock_guard lock(queue_mutex_); - // Add event to queue - event_queue_.emplace_back(event_id, *msg); + // Add event to queue with resolved entity context + event_queue_.push_back(QueuedEvent{event_id, *msg, std::move(entity)}); // Trim old events if buffer is full while (event_queue_.size() > kMaxBufferedEvents) { @@ -156,13 +161,13 @@ void SSEFaultHandler::handle_stream(const httplib::Request & req, httplib::Respo // First, send any buffered events the client missed (for reconnection) { std::lock_guard lock(queue_mutex_); - for (const auto & [id, event] : event_queue_) { - if (id > last_event_id) { - std::string sse_msg = format_sse_event(event, id); + for (const auto & queued : event_queue_) { + if (queued.id > last_event_id) { + std::string sse_msg = format_sse_event(queued); if (!sink.write(sse_msg.data(), sse_msg.size())) { return false; // Client disconnected } - last_event_id = id; + last_event_id = queued.id; } } } @@ -198,15 +203,15 @@ void SSEFaultHandler::handle_stream(const httplib::Request & req, httplib::Respo // Check for new events bool found_new = false; - for (const auto & [id, event] : event_queue_) { - if (id > last_event_id) { - std::string sse_msg = format_sse_event(event, id); + for (const auto & queued : event_queue_) { + if (queued.id > last_event_id) { + std::string sse_msg = format_sse_event(queued); lock.unlock(); if (!sink.write(sse_msg.data(), sse_msg.size())) { return false; // Client disconnected } lock.lock(); - last_event_id = id; + last_event_id = queued.id; found_new = true; } } @@ -230,24 +235,89 @@ size_t SSEFaultHandler::connected_clients() const { return client_tracker_->connected_clients(); } -std::string SSEFaultHandler::format_sse_event(const ros2_medkit_msgs::msg::FaultEvent & event, uint64_t event_id) { - const auto sanitized_event_type = sanitize_sse_event_type(event.event_type); +std::string SSEFaultHandler::format_sse_event(const QueuedEvent & queued) { + const auto sanitized_event_type = sanitize_sse_event_type(queued.event.event_type); nlohmann::json json_event; json_event["event_type"] = sanitized_event_type; - json_event["fault"] = ros2::conversions::fault_to_json(event.fault); + json_event["fault"] = ros2::conversions::fault_to_json(queued.event.fault); // Convert timestamp to seconds with nanosecond precision - double timestamp_sec = static_cast(event.timestamp.sec) + static_cast(event.timestamp.nanosec) * 1e-9; + double timestamp_sec = + static_cast(queued.event.timestamp.sec) + static_cast(queued.event.timestamp.nanosec) * 1e-9; json_event["timestamp"] = timestamp_sec; + // SOVD payload extension: nest ``entity_type`` / ``entity_id`` under the + // ``x-medkit`` response-extension object so global-stream consumers can + // hit ``/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}`` + // directly instead of HEAD-probing every entity. Flat ``x-medkit-*`` + // names are reserved for endpoint paths (``/x-medkit-graph``) and error + // codes, not payload fields. + if (queued.entity) { + json_event["x-medkit"] = {{"entity_type", queued.entity->type}, {"entity_id", queued.entity->id}}; + } + std::ostringstream sse; - sse << "id: " << event_id << "\n"; + sse << "id: " << queued.id << "\n"; sse << "event: " << sanitized_event_type << "\n"; sse << "data: " << json_event.dump() << "\n\n"; return sse.str(); } +std::optional +SSEFaultHandler::resolve_entity_context(const ros2_medkit_msgs::msg::Fault & fault) const { + if (fault.reporting_sources.empty()) { + return std::nullopt; + } + const auto & raw_fqn = fault.reporting_sources.front(); + if (raw_fqn.empty()) { + return std::nullopt; + } + + const auto & cache = ctx_.node()->get_thread_safe_cache(); + + // Manifest / hybrid mode: the linking step populated node_to_app with the + // ROS FQN -> manifest app id mapping. Try both FQN forms (with and without + // the leading '/'), mirroring gateway_node's node_resolver lambda. + std::string entity_id = cache.resolve_node_to_app(raw_fqn); + if (entity_id.empty() && raw_fqn.front() == '/') { + entity_id = cache.resolve_node_to_app(raw_fqn.substr(1)); + } + + // Runtime fallback: synthetic apps are created with id = ROS node name + // (the FQN's last segment) when there is no namespace collision, or + // ``_`` (slashes in the namespace replaced with '_') when + // multiple nodes share the same name. See ros2_runtime_introspection.cpp. + // Only accept a candidate that actually exists as an App in the cache so + // we never point consumers at a 404. + if (entity_id.empty()) { + auto last_slash = raw_fqn.rfind('/'); + auto name = (last_slash != std::string::npos) ? raw_fqn.substr(last_slash + 1) : raw_fqn; + if (!name.empty() && cache.has_app(name)) { + entity_id = std::move(name); + } else if (last_slash != std::string::npos && last_slash > 0) { + // Try the collision-disambiguated form: ns prefix (sans leading '/'), + // slashes replaced with '_', then '_' + name. + auto ns_prefix = raw_fqn.substr(1, last_slash - 1); + std::replace(ns_prefix.begin(), ns_prefix.end(), '/', '_'); + auto namespaced = ns_prefix + "_" + name; + if (cache.has_app(namespaced)) { + entity_id = std::move(namespaced); + } + } + } + + if (entity_id.empty()) { + RCLCPP_DEBUG(HandlerContext::logger(), + "SSE fault event: no entity match for reporting source '%s' (fault_code='%s'); " + "omitting x-medkit extension", + raw_fqn.c_str(), fault.fault_code.c_str()); + return std::nullopt; + } + + return EntityContext{"apps", std::move(entity_id)}; +} + } // namespace handlers } // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/test/test_sse_fault_handler.cpp b/src/ros2_medkit_gateway/test/test_sse_fault_handler.cpp index c840651b..2d0602bf 100644 --- a/src/ros2_medkit_gateway/test/test_sse_fault_handler.cpp +++ b/src/ros2_medkit_gateway/test/test_sse_fault_handler.cpp @@ -28,7 +28,9 @@ #include #include "ros2_medkit_gateway/core/config.hpp" +#include "ros2_medkit_gateway/core/discovery/models/app.hpp" #include "ros2_medkit_gateway/core/http/sse_client_tracker.hpp" +#include "ros2_medkit_gateway/core/models/thread_safe_entity_cache.hpp" #include "ros2_medkit_gateway/fault_manager_paths.hpp" #include "ros2_medkit_gateway/gateway_node.hpp" #include "ros2_medkit_gateway/http/handlers/handler_context.hpp" @@ -37,10 +39,12 @@ using json = nlohmann::json; using namespace std::chrono_literals; +using ros2_medkit_gateway::App; using ros2_medkit_gateway::AuthConfig; using ros2_medkit_gateway::CorsConfig; using ros2_medkit_gateway::GatewayNode; using ros2_medkit_gateway::SSEClientTracker; +using ros2_medkit_gateway::ThreadSafeEntityCache; using ros2_medkit_gateway::TlsConfig; using ros2_medkit_gateway::handlers::HandlerContext; using ros2_medkit_gateway::handlers::SSEFaultHandler; @@ -415,6 +419,199 @@ TEST_F(SSEFaultHandlerTest, NonPositiveKeepaliveOverrideLogsWarning) { EXPECT_NE(logs.find("Non-positive SSE keepalive override"), std::string::npos); } +TEST_F(SSEFaultHandlerTest, StreamOmitsXMedkitWhenNoMatchingApp) { + // Empty cache: reporting source ("/apps/temp_sensor") has no manifest mapping + // and no App with id "temp_sensor" exists in runtime cache, so the + // ``x-medkit`` payload extension is not emitted (consumer falls back to + // discovery). + enqueue_event(make_fault_event(FaultEvent::EVENT_CONFIRMED, "NO_OWNER", 10)); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + EXPECT_FALSE(payload.contains("x-medkit")); + + release_stream(res); +} + +TEST_F(SSEFaultHandlerTest, StreamEmitsXMedkitForRuntimeApp) { + // Runtime_only fallback: synthetic App with id matching the reporting + // source's last segment exists in the cache, so the ``x-medkit`` payload + // extension points consumers straight at + // /apps/temp_sensor/bulk-data/rosbags/. + App app; + app.id = "temp_sensor"; + app.name = "temp_sensor"; + app.source = "heuristic"; + app.bound_fqn = "/apps/temp_sensor"; + auto & cache = const_cast(node_->get_thread_safe_cache()); + cache.update_apps({app}); + + enqueue_event(make_fault_event(FaultEvent::EVENT_CONFIRMED, "WITH_OWNER", 20)); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + ASSERT_TRUE(payload.contains("x-medkit")) << payload.dump(); + EXPECT_EQ(payload["x-medkit"]["entity_type"], "apps"); + EXPECT_EQ(payload["x-medkit"]["entity_id"], "temp_sensor"); + + release_stream(res); +} + +TEST_F(SSEFaultHandlerTest, StreamEmitsXMedkitForManifestApp) { + // Manifest/hybrid mode: linking populates node_to_app with the ROS FQN -> + // manifest app id mapping. The fault's reporting source uses the ROS FQN + // form (with leading slash), but the manifest app id can be arbitrary (here + // "diagnostic-bridge" with a hyphen, to model the FQN/id naming mismatch + // called out in #380). + App app; + app.id = "diagnostic-bridge"; + app.name = "diagnostic_bridge"; + app.source = "manifest"; + app.bound_fqn = "/bridge/diagnostic_bridge"; + std::unordered_map node_to_app{{"/bridge/diagnostic_bridge", "diagnostic-bridge"}}; + auto & cache = const_cast(node_->get_thread_safe_cache()); + cache.update_all({}, {}, {app}, {}, node_to_app); + + auto event = make_fault_event(FaultEvent::EVENT_CONFIRMED, "BRIDGE_FAULT", 30); + event.fault.reporting_sources = {"/bridge/diagnostic_bridge"}; + enqueue_event(event); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + ASSERT_TRUE(payload.contains("x-medkit")) << payload.dump(); + EXPECT_EQ(payload["x-medkit"]["entity_type"], "apps"); + EXPECT_EQ(payload["x-medkit"]["entity_id"], "diagnostic-bridge"); + + release_stream(res); +} + +TEST_F(SSEFaultHandlerTest, StreamSnapshotsEntityContextAtEnqueue) { + // The core design claim of #380: entity resolution is pinned at enqueue + // time so a discovery refresh between enqueue and stream replay cannot + // flip the entity reported to consumers. Enqueue with a matching App in + // the cache, then mutate the cache to remove it, then replay - the + // x-medkit object must still reflect the snapshot. + App app; + app.id = "temp_sensor"; + app.name = "temp_sensor"; + app.source = "heuristic"; + app.bound_fqn = "/apps/temp_sensor"; + auto & cache = const_cast(node_->get_thread_safe_cache()); + cache.update_apps({app}); + + enqueue_event(make_fault_event(FaultEvent::EVENT_CONFIRMED, "SNAPSHOTTED", 50)); + + // Discovery refresh: remove the App. Without snapshot-at-enqueue, a + // replay would now see an empty cache and omit x-medkit. + cache.update_apps({}); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + ASSERT_TRUE(payload.contains("x-medkit")) << payload.dump(); + EXPECT_EQ(payload["x-medkit"]["entity_type"], "apps"); + EXPECT_EQ(payload["x-medkit"]["entity_id"], "temp_sensor"); + + release_stream(res); +} + +TEST_F(SSEFaultHandlerTest, StreamResolvesManifestFqnWithoutLeadingSlash) { + // gateway_node's node_resolver tries both FQN forms; the linking layer may + // store keys without the leading '/' (e.g. when the manifest declares + // namespace-only FQNs). The handler must try the slash-stripped form so + // /bridge/diagnostic_bridge resolves against a node_to_app key of + // "bridge/diagnostic_bridge". + App app; + app.id = "diagnostic-bridge"; + app.name = "diagnostic_bridge"; + app.source = "manifest"; + app.bound_fqn = "/bridge/diagnostic_bridge"; + std::unordered_map node_to_app{{"bridge/diagnostic_bridge", "diagnostic-bridge"}}; + auto & cache = const_cast(node_->get_thread_safe_cache()); + cache.update_all({}, {}, {app}, {}, node_to_app); + + auto event = make_fault_event(FaultEvent::EVENT_CONFIRMED, "SLASH_STRIP", 60); + event.fault.reporting_sources = {"/bridge/diagnostic_bridge"}; + enqueue_event(event); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + ASSERT_TRUE(payload.contains("x-medkit")) << payload.dump(); + EXPECT_EQ(payload["x-medkit"]["entity_id"], "diagnostic-bridge"); + + release_stream(res); +} + +TEST_F(SSEFaultHandlerTest, StreamResolvesRuntimeCollisionRenamedApp) { + // Runtime discovery renames collision-prone nodes to _ + // (see ros2_runtime_introspection.cpp). The fallback must also try this + // form so faults from those apps still get a usable x-medkit context. + App app; + app.id = "bridge_diagnostic_bridge"; + app.name = "diagnostic_bridge"; + app.source = "heuristic"; + app.bound_fqn = "/bridge/diagnostic_bridge"; + auto & cache = const_cast(node_->get_thread_safe_cache()); + cache.update_apps({app}); + + auto event = make_fault_event(FaultEvent::EVENT_CONFIRMED, "COLLISION", 70); + event.fault.reporting_sources = {"/bridge/diagnostic_bridge"}; + enqueue_event(event); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + ASSERT_TRUE(payload.contains("x-medkit")) << payload.dump(); + EXPECT_EQ(payload["x-medkit"]["entity_id"], "bridge_diagnostic_bridge"); + + release_stream(res); +} + +TEST_F(SSEFaultHandlerTest, StreamOmitsXMedkitWhenReportingSourcesEmpty) { + auto event = make_fault_event(FaultEvent::EVENT_CONFIRMED, "ORPHAN", 40); + event.fault.reporting_sources.clear(); + enqueue_event(event); + + auto req = make_stream_request("127.0.0.1"); + httplib::Response res; + handler_->handle_stream(req, res); + + auto output = read_stream_once(res, 1); + auto payload = parse_sse_payload(output); + + EXPECT_FALSE(payload.contains("x-medkit")); + + release_stream(res); +} + TEST_F(SSEFaultHandlerTest, DisconnectReleasesTrackedClientSlot) { auto req = make_stream_request("127.0.0.1"); httplib::Response res;