Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/api/rest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2372,7 +2372,12 @@ Other extensions beyond SOVD:

- Vendor extension fields using ``x-medkit`` prefix (per SOVD extension mechanism)
- ``DELETE /faults`` - Clear all faults globally
- ``GET /faults/stream`` - SSE real-time fault notifications
- ``GET /faults/stream`` - SSE real-time fault notifications. Each event payload carries an
optional ``x-medkit`` SOVD payload-extension object with ``entity_type`` and ``entity_id``
fields when the gateway can resolve the fault's first reporting source back to an entity,
so consumers can hit ``/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}`` directly
without enumerating entities. Resolution is snapshotted at event arrival; the entire
``x-medkit`` object is omitted when no entity can be resolved.
- ``/health`` - Health check with discovery pipeline diagnostics
- ``/version-info`` - Gateway version information
- ``/docs`` - OpenAPI capability description
Expand Down
1 change: 1 addition & 0 deletions src/ros2_medkit_gateway/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Unreleased

**Features:**

* ``GET /api/v1/faults/stream`` event payloads now carry an optional ``x-medkit`` SOVD payload-extension object with ``entity_type`` and ``entity_id`` fields. When the gateway can resolve the fault's first reporting source back to a SOVD entity (via the manifest-mode linking index, or a runtime-mode last-segment match against an existing App), consumers can hit ``/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}`` directly instead of HEAD-probing every entity. Resolution is snapshotted at event arrival, so a discovery refresh between enqueue and stream-out cannot retroactively change the entity reported to consumers. The ``x-medkit`` object is omitted entirely when no entity can be resolved, so existing SSE consumers ignore the addition (`#380 <https://github.com/selfpatch/ros2_medkit/issues/380>`_)
* Plugin API version bumped to v7. Adds ``PluginContext::notify_entities_changed(EntityChangeScope)`` lifecycle hook for plugins that mutate the entity surface at runtime; default no-op keeps v6 source code compiling unchanged against v7 headers. Binary compatibility is not provided: the plugin loader uses a strict equality check on ``plugin_api_version()``, so out-of-tree plugins must be recompiled (`#376 <https://github.com/selfpatch/ros2_medkit/issues/376>`_)
* New ``discovery.manifest.fragments_dir`` parameter: gateway scans the directory for ``*.yaml`` / ``*.yml`` fragment files on every manifest load / reload and merges apps, components, and functions on top of the base manifest. Fragments are forbidden from declaring top-level ``areas``, ``metadata``, ``discovery``, ``scripts``, ``capabilities``, or ``lock_overrides`` - those stay in the base manifest. Presence of any forbidden key (including empty-valued ones like ``areas: []``) is reported as a ``FRAGMENT_FORBIDDEN_FIELD`` validation error that fails the load / reload. Unknown top-level keys (typos such as ``app:`` vs ``apps:``) are ignored with a warning log. Files merged in alphabetical order for deterministic duplicate-id errors (`#376 <https://github.com/selfpatch/ros2_medkit/issues/376>`_)
* Fragment files are size-capped at 1 MiB (``ManifestParser::kMaxFragmentBytes``) before being read into memory, and any symlink resolving outside the canonical ``fragments_dir`` is skipped with a warning, so misconfigurations or symlink-based escapes cannot hand arbitrary bytes to the YAML parser (`#376 <https://github.com/selfpatch/ros2_medkit/issues/376>`_)
Expand Down
7 changes: 5 additions & 2 deletions src/ros2_medkit_gateway/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ Real-time fault event stream using Server-Sent Events (SSE). Clients receive ins
- **Automatic reconnection**: Supports `Last-Event-ID` header for seamless reconnection
- **Keepalive**: Sends `:keepalive` comment every 30 seconds to prevent timeouts
- **Event buffer**: Buffers up to 100 recent events for reconnecting clients
- **Entity context (SOVD payload extension)**: When the gateway can resolve the fault's first reporting source back to an entity, the payload carries an `x-medkit` object with `entity_type` and `entity_id` fields so consumers can hit `/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}` directly without enumerating entities

**Event Types:**
- `fault_confirmed` - Fault transitioned to CONFIRMED status
Expand All @@ -781,13 +782,15 @@ curl -N http://localhost:8080/api/v1/faults/stream

id: 1
event: fault_confirmed
data: {"event_type":"fault_confirmed","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830000.123}
data: {"event_type":"fault_confirmed","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830000.123,"x-medkit":{"entity_type":"apps","entity_id":"motor_controller"}}

id: 2
event: fault_cleared
data: {"event_type":"fault_cleared","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830060.456}
data: {"event_type":"fault_cleared","fault":{"fault_code":"MOTOR_OVERHEAT",...},"timestamp":1735830060.456,"x-medkit":{"entity_type":"apps","entity_id":"motor_controller"}}
```

**SOVD payload extension `x-medkit.entity_*`** (non-standard, SOVD-compatible): Resolution is best-effort and snapshotted at event arrival. Manifest and hybrid discovery use the linking result; runtime-only discovery falls back to the FQN's last segment and only emits the fields when an App with that ID exists in the cache. When no entity can be resolved (no reporting sources, orphan source, etc.), the entire `x-medkit` object is omitted and the consumer must fall back to discovery.

**Response (503 Service Unavailable - Client Limit Reached):**
```json
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
#include <deque>
#include <memory>
#include <mutex>
#include <optional>
#include <string>

#include <nlohmann/json.hpp>

#include "rclcpp/rclcpp.hpp"
#include "ros2_medkit_gateway/core/http/sse_client_tracker.hpp"
#include "ros2_medkit_gateway/http/handlers/handler_context.hpp"
Expand Down Expand Up @@ -111,8 +114,33 @@ class SSEFaultHandler {
/// Callback for fault events from ROS 2 topic
void on_fault_event(const ros2_medkit_msgs::msg::FaultEvent::ConstSharedPtr & msg);

/// Resolved owning entity for a fault. Populates the ``x-medkit`` SOVD
/// payload-extension object on outgoing SSE events.
struct EntityContext {
std::string type;
std::string id;
};

/// Buffered queue entry. ``entity`` is resolved at enqueue time so a
/// discovery refresh between enqueue and stream-out cannot retroactively
/// flip the entity reported to consumers.
struct QueuedEvent {
uint64_t id;
ros2_medkit_msgs::msg::FaultEvent event;
std::optional<EntityContext> entity;
};

/// Format a fault event as SSE message
static std::string format_sse_event(const ros2_medkit_msgs::msg::FaultEvent & event, uint64_t event_id);
static std::string format_sse_event(const QueuedEvent & queued);

/// Resolve the owning entity for a fault, snapshotting the cache. Manifest
/// / hybrid mode uses the cache's node-to-app index; runtime mode falls
/// back to the FQN's last segment, and for collision-disambiguated runtime
/// apps also to ``<ns_prefix>_<name>`` per
/// ros2_runtime_introspection.cpp's renaming rule. Only emits a value when
/// an App with the resolved id actually exists in the cache - returns
/// ``std::nullopt`` otherwise so the consumer falls back to discovery.
std::optional<EntityContext> resolve_entity_context(const ros2_medkit_msgs::msg::Fault & fault) const;

HandlerContext & ctx_;
std::shared_ptr<SSEClientTracker> client_tracker_;
Expand All @@ -123,7 +151,7 @@ class SSEFaultHandler {
/// Event queue for broadcasting to clients
mutable std::mutex queue_mutex_;
std::condition_variable queue_cv_;
std::deque<std::pair<uint64_t, ros2_medkit_msgs::msg::FaultEvent>> event_queue_;
std::deque<QueuedEvent> event_queue_;

/// Monotonically increasing event ID for Last-Event-ID support
std::atomic<uint64_t> next_event_id_{1};
Expand Down
100 changes: 85 additions & 15 deletions src/ros2_medkit_gateway/src/http/handlers/sse_fault_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,17 @@ void SSEFaultHandler::request_shutdown() {
void SSEFaultHandler::on_fault_event(const ros2_medkit_msgs::msg::FaultEvent::ConstSharedPtr & msg) {
uint64_t event_id = next_event_id_.fetch_add(1);

// Snapshot entity context before acquiring the queue lock so cache state
// is pinned to the fault arrival timestamp and the formatting path stays
// lock-free with respect to the cache.
auto entity = resolve_entity_context(msg->fault);

std::size_t dropped_this_call = 0;
{
std::lock_guard<std::mutex> lock(queue_mutex_);

// Add event to queue
event_queue_.emplace_back(event_id, *msg);
// Add event to queue with resolved entity context
event_queue_.push_back(QueuedEvent{event_id, *msg, std::move(entity)});

// Trim old events if buffer is full
while (event_queue_.size() > kMaxBufferedEvents) {
Expand Down Expand Up @@ -156,13 +161,13 @@ void SSEFaultHandler::handle_stream(const httplib::Request & req, httplib::Respo
// First, send any buffered events the client missed (for reconnection)
{
std::lock_guard<std::mutex> lock(queue_mutex_);
for (const auto & [id, event] : event_queue_) {
if (id > last_event_id) {
std::string sse_msg = format_sse_event(event, id);
for (const auto & queued : event_queue_) {
if (queued.id > last_event_id) {
std::string sse_msg = format_sse_event(queued);
if (!sink.write(sse_msg.data(), sse_msg.size())) {
return false; // Client disconnected
}
last_event_id = id;
last_event_id = queued.id;
}
}
}
Expand Down Expand Up @@ -198,15 +203,15 @@ void SSEFaultHandler::handle_stream(const httplib::Request & req, httplib::Respo

// Check for new events
bool found_new = false;
for (const auto & [id, event] : event_queue_) {
if (id > last_event_id) {
std::string sse_msg = format_sse_event(event, id);
for (const auto & queued : event_queue_) {
if (queued.id > last_event_id) {
std::string sse_msg = format_sse_event(queued);
lock.unlock();
if (!sink.write(sse_msg.data(), sse_msg.size())) {
return false; // Client disconnected
}
lock.lock();
last_event_id = id;
last_event_id = queued.id;
found_new = true;
}
}
Expand All @@ -230,24 +235,89 @@ size_t SSEFaultHandler::connected_clients() const {
return client_tracker_->connected_clients();
}

std::string SSEFaultHandler::format_sse_event(const ros2_medkit_msgs::msg::FaultEvent & event, uint64_t event_id) {
const auto sanitized_event_type = sanitize_sse_event_type(event.event_type);
std::string SSEFaultHandler::format_sse_event(const QueuedEvent & queued) {
const auto sanitized_event_type = sanitize_sse_event_type(queued.event.event_type);

nlohmann::json json_event;
json_event["event_type"] = sanitized_event_type;
json_event["fault"] = ros2::conversions::fault_to_json(event.fault);
json_event["fault"] = ros2::conversions::fault_to_json(queued.event.fault);

// Convert timestamp to seconds with nanosecond precision
double timestamp_sec = static_cast<double>(event.timestamp.sec) + static_cast<double>(event.timestamp.nanosec) * 1e-9;
double timestamp_sec =
static_cast<double>(queued.event.timestamp.sec) + static_cast<double>(queued.event.timestamp.nanosec) * 1e-9;
json_event["timestamp"] = timestamp_sec;

// SOVD payload extension: nest ``entity_type`` / ``entity_id`` under the
// ``x-medkit`` response-extension object so global-stream consumers can
// hit ``/{entity_type}/{entity_id}/bulk-data/rosbags/{fault_code}``
// directly instead of HEAD-probing every entity. Flat ``x-medkit-*``
// names are reserved for endpoint paths (``/x-medkit-graph``) and error
// codes, not payload fields.
if (queued.entity) {
json_event["x-medkit"] = {{"entity_type", queued.entity->type}, {"entity_id", queued.entity->id}};
}

std::ostringstream sse;
sse << "id: " << event_id << "\n";
sse << "id: " << queued.id << "\n";
sse << "event: " << sanitized_event_type << "\n";
sse << "data: " << json_event.dump() << "\n\n";

return sse.str();
}

std::optional<SSEFaultHandler::EntityContext>
SSEFaultHandler::resolve_entity_context(const ros2_medkit_msgs::msg::Fault & fault) const {
if (fault.reporting_sources.empty()) {
return std::nullopt;
}
const auto & raw_fqn = fault.reporting_sources.front();
if (raw_fqn.empty()) {
return std::nullopt;
}

const auto & cache = ctx_.node()->get_thread_safe_cache();

// Manifest / hybrid mode: the linking step populated node_to_app with the
// ROS FQN -> manifest app id mapping. Try both FQN forms (with and without
// the leading '/'), mirroring gateway_node's node_resolver lambda.
std::string entity_id = cache.resolve_node_to_app(raw_fqn);
if (entity_id.empty() && raw_fqn.front() == '/') {
entity_id = cache.resolve_node_to_app(raw_fqn.substr(1));
}

// Runtime fallback: synthetic apps are created with id = ROS node name
// (the FQN's last segment) when there is no namespace collision, or
// ``<ns_prefix>_<name>`` (slashes in the namespace replaced with '_') when
// multiple nodes share the same name. See ros2_runtime_introspection.cpp.
// Only accept a candidate that actually exists as an App in the cache so
// we never point consumers at a 404.
if (entity_id.empty()) {
auto last_slash = raw_fqn.rfind('/');
auto name = (last_slash != std::string::npos) ? raw_fqn.substr(last_slash + 1) : raw_fqn;
if (!name.empty() && cache.has_app(name)) {
entity_id = std::move(name);
} else if (last_slash != std::string::npos && last_slash > 0) {
// Try the collision-disambiguated form: ns prefix (sans leading '/'),
// slashes replaced with '_', then '_' + name.
auto ns_prefix = raw_fqn.substr(1, last_slash - 1);
std::replace(ns_prefix.begin(), ns_prefix.end(), '/', '_');
auto namespaced = ns_prefix + "_" + name;
if (cache.has_app(namespaced)) {
entity_id = std::move(namespaced);
}
}
}

if (entity_id.empty()) {
RCLCPP_DEBUG(HandlerContext::logger(),
"SSE fault event: no entity match for reporting source '%s' (fault_code='%s'); "
"omitting x-medkit extension",
raw_fqn.c_str(), fault.fault_code.c_str());
return std::nullopt;
}

return EntityContext{"apps", std::move(entity_id)};
}

} // namespace handlers
} // namespace ros2_medkit_gateway
Loading
Loading