diff --git a/.vscode/mcp.json b/.vscode/mcp.json index 2c8d1a99..7e695976 100644 --- a/.vscode/mcp.json +++ b/.vscode/mcp.json @@ -3,6 +3,18 @@ "aimdb-weather": { "type": "http", "url": "http://aimdb.dev/mcp" + }, + "aimdb-local": { + "type": "stdio", + "command": "cargo", + "args": [ + "run", + "--manifest-path", + "${workspaceFolder}/tools/aimdb-mcp/Cargo.toml", + "--", + "--socket", + "/tmp/aimdb-demo.sock" + ] } } } \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index b06e6c1c..3be47b43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Automatic stage profiling (Issue #58, RFC 014)**: New `profiling` feature on `aimdb-core` automatically measures wall-clock time per `.source()` / `.tap()` / `.link()` stage and exposes `call_count` / `total` / `avg` / `min` / `max` counters per stage. Name stages with `RecordRegistrar::with_name("...")`. Works on `no_std + alloc` via the runtime clock (`aimdb_executor::TimeOps`) and `portable-atomic` for 64-bit atomics on `thumbv7em`. New MCP tools `get_stage_profiling` (with bottleneck detection + recommendation) and `reset_stage_profiling`. Feature is off by default and zero-cost when disabled. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-executor](aimdb-executor/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-wasm-adapter](aimdb-wasm-adapter/CHANGELOG.md), [aimdb-client](aimdb-client/CHANGELOG.md), [tools/aimdb-mcp](tools/aimdb-mcp/CHANGELOG.md)) +- **`remote-access-demo` exercises stage profiling**: The `Temperature` and `SystemStatus` records now run from in-AimDB `.source()` + `.tap()` tasks (with `.with_name(...)` stage labels) and the demo enables the `profiling` feature. `SystemStatus.slow_status_processor` deliberately sleeps 100 ms per value so `get_stage_profiling` flags it as the bottleneck. README documents how to query and reset profiling via MCP / `socat`. - **`hello-mailbox` example (Issue #94)**: New minimal example demonstrating the Mailbox buffer (latest-wins) semantics using the synchronous API. First community contribution from [@ggmaldo](https://github.com/ggmaldo) — see [examples/hello-mailbox/](examples/hello-mailbox/). - **Writer-exclusivity validation (Issue #89)**: Combining `.source()`, `.transform()`, and `.link_from()` on the same record now panics at configuration time with a clear message instead of silently producing a last-writer-wins race on the buffer. Multiple `.link_from()` inbound connectors (fan-in) remain allowed. ([aimdb-core](aimdb-core/CHANGELOG.md)) - **`no_std` Transform API (Design 027)**: `.transform()` and `.transform_join()` are now available on `no_std + alloc` targets — no longer Tokio-only. Multi-input join fan-in moved out of `aimdb-core` into the new `JoinFanInRuntime` traits in `aimdb-executor`, with implementations in the Tokio (`mpsc::channel`, capacity 64), Embassy (`embassy_sync::Channel`, capacity 8), and WASM (`futures_channel::mpsc`, capacity 64) adapters. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-executor](aimdb-executor/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-wasm-adapter](aimdb-wasm-adapter/CHANGELOG.md)) @@ -51,6 +53,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- **aimdb-executor**: `TimeOps` trait gained a required `duration_as_nanos(duration) -> u64` method (runtime-agnostic numeric representation of elapsed time, used by stage profiling). Implemented in tokio, embassy, and wasm adapters. ([aimdb-executor](aimdb-executor/CHANGELOG.md)) - **aimdb-embassy-adapter**: `SpmcRing` subscriber-slot exhaustion now emits a `defmt::error!` with guidance to increase the `CONSUMERS` const generic. Counting rule: one slot per `.tap()`, `.link_to()`, and `transform_join` input. - **aimdb-codegen**: Generated join handler stubs updated to the new `on_triggers` task model (`async fn task_handler(JoinEventRx, Producer<...>)`). - **aimdb-core**: Breaking API changes to `InboundConnectorLink`, `Router`, and `RouterBuilder` to support `DeserializerKind` (see [aimdb-core/CHANGELOG.md](aimdb-core/CHANGELOG.md)) diff --git a/Cargo.lock b/Cargo.lock index d260a656..9ce92809 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -977,6 +977,8 @@ version = "0.1.2" name = "embassy-hal-internal" version = "0.5.0" dependencies = [ + "aligned", + "as-slice", "cortex-m", "critical-section", "defmt 1.0.1", @@ -2372,6 +2374,9 @@ name = "portable-atomic" version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" +dependencies = [ + "critical-section", +] [[package]] name = "potential_utf" @@ -3023,7 +3028,7 @@ dependencies = [ [[package]] name = "stm32-metapac" version = "21.0.0" -source = "git+https://github.com/embassy-rs/stm32-data-generated?tag=stm32-data-5261d86838979da64855b1860a6f3ea8ef90746f#9fc43c9d87d9a0cde91089bd7532fed520873482" +source = "git+https://github.com/embassy-rs/stm32-data-generated?tag=stm32-data-fe91bec726df47c6467dafce3fcf64075f662a3c#1569e3dc33adb6924329da79ef7dc8ce64bc791d" dependencies = [ "cortex-m", "cortex-m-rt", diff --git a/Makefile b/Makefile index 22d6f753..aed29f65 100644 --- a/Makefile +++ b/Makefile @@ -57,8 +57,14 @@ build: cargo build --package aimdb-core --no-default-features --features alloc @printf "$(YELLOW) → Building aimdb-core (std platform)$(NC)\n" cargo build --package aimdb-core --features "std,tracing,metrics" + @printf "$(YELLOW) → Building aimdb-core (no_std + alloc + profiling)$(NC)\n" + cargo build --package aimdb-core --no-default-features --features "alloc,profiling" + @printf "$(YELLOW) → Building aimdb-core (std + profiling)$(NC)\n" + cargo build --package aimdb-core --features "std,tracing,profiling" @printf "$(YELLOW) → Building tokio adapter$(NC)\n" cargo build --package aimdb-tokio-adapter --features "tokio-runtime,tracing,metrics" + @printf "$(YELLOW) → Building tokio adapter (with profiling)$(NC)\n" + cargo build --package aimdb-tokio-adapter --features "tokio-runtime,tracing,profiling" @printf "$(YELLOW) → Building sync wrapper$(NC)\n" cargo build --package aimdb-sync @printf "$(YELLOW) → Building codegen library$(NC)\n" @@ -90,12 +96,18 @@ test: cargo test --package aimdb-core --features "std,tracing" @printf "$(YELLOW) → Testing aimdb-core (std + metrics)$(NC)\n" cargo test --package aimdb-core --features "std,tracing,metrics" + @printf "$(YELLOW) → Testing aimdb-core (std + profiling)$(NC)\n" + cargo test --package aimdb-core --features "std,tracing,profiling" + @printf "$(YELLOW) → Testing aimdb-core (no_std + alloc + profiling)$(NC)\n" + cargo test --package aimdb-core --no-default-features --features "alloc,profiling" @printf "$(YELLOW) → Testing aimdb-core remote module$(NC)\n" cargo test --package aimdb-core --lib --features "std" remote:: @printf "$(YELLOW) → Testing tokio adapter$(NC)\n" cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing" @printf "$(YELLOW) → Testing tokio adapter (with metrics)$(NC)\n" cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing,metrics" + @printf "$(YELLOW) → Testing tokio adapter (with profiling)$(NC)\n" + cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing,profiling" @printf "$(YELLOW) → Testing sync wrapper$(NC)\n" cargo test --package aimdb-sync @printf "$(YELLOW) → Testing codegen library$(NC)\n" @@ -243,6 +255,8 @@ test-embedded: cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime" @printf "$(YELLOW) → Checking aimdb-embassy-adapter with network support on thumbv7em-none-eabihf target$(NC)\n" cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime,embassy-net-support" + @printf "$(YELLOW) → Checking aimdb-embassy-adapter with profiling on thumbv7em-none-eabihf target$(NC)\n" + cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime,profiling" @printf "$(YELLOW) → Checking aimdb-mqtt-connector (Embassy) on thumbv7em-none-eabihf target$(NC)\n" cargo check --package aimdb-mqtt-connector --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime" @printf "$(YELLOW) → Checking aimdb-mqtt-connector (Embassy + defmt) on thumbv7em-none-eabihf target$(NC)\n" diff --git a/_external/embassy b/_external/embassy index f8fd6ec9..f1c6bb7d 160000 --- a/_external/embassy +++ b/_external/embassy @@ -1 +1 @@ -Subproject commit f8fd6ec924b7d73fcee9ce8e97e1dba47200dac2 +Subproject commit f1c6bb7df80e592b7a7edc709f546771506806d3 diff --git a/aimdb-client/CHANGELOG.md b/aimdb-client/CHANGELOG.md index 4f7a0123..826c3280 100644 --- a/aimdb-client/CHANGELOG.md +++ b/aimdb-client/CHANGELOG.md @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -No changes yet. +### Added + +- **`AimxClient::reset_stage_profiling()`** (Issue #58): New method issuing the `profiling.reset` AimX request to clear stage profiling counters for every record on the server. Requires the server to be built with the `profiling` feature and the connection to have write permission. ## [0.5.0] - 2026-02-21 diff --git a/aimdb-client/src/connection.rs b/aimdb-client/src/connection.rs index 2f38f29a..1549a41d 100644 --- a/aimdb-client/src/connection.rs +++ b/aimdb-client/src/connection.rs @@ -124,6 +124,14 @@ impl AimxClient { Ok(records) } + /// Reset stage profiling counters for every record on the server. + /// + /// Requires the server to be built with the `profiling` feature and the + /// connection to have write permission. + pub async fn reset_stage_profiling(&mut self) -> ClientResult { + self.send_request("profiling.reset", None).await + } + /// Get current value of a record pub async fn get_record(&mut self, name: &str) -> ClientResult { let params = json!({ "record": name }); diff --git a/aimdb-core/CHANGELOG.md b/aimdb-core/CHANGELOG.md index 752ac1ec..902ffe8d 100644 --- a/aimdb-core/CHANGELOG.md +++ b/aimdb-core/CHANGELOG.md @@ -9,6 +9,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Automatic stage profiling (Issue #58, RFC 014, feature `profiling`)**: AimDB now measures wall-clock time per `.source()`, `.tap()`, and `.link()` stage with no user instrumentation. Feature is off by default and adds zero overhead when disabled; `alloc` + a runtime clock is enough, so it works on `no_std + alloc` targets too. + - New `profiling` module exporting `StageMetrics` (atomic `call_count` / `total_time_ns` / `avg_time_ns` / `min_time_ns` / `max_time_ns` counters), `RecordProfilingMetrics` per-record container, and serializable `StageProfilingInfo` snapshot. + - Source-stage timing measures the interval between successive `Producer::produce()` calls via a new `ProducerProfilingState`. Tap- and link-stage timing wraps the `BufferReader` returned by `Consumer::subscribe()` in a new `ProfilingBufferReader` that times the interval between successive `recv()` yields. The whole-task closure shape of `.source()` / `.tap()` is preserved — no per-value handler changes. + - `RecordRegistrar::with_name("...")` assigns a human-readable name to the most recently registered source/tap/link; surfaces in MCP output. Always callable — a no-op when the feature is disabled. + - New `StageKind` enum (`Source` / `Tap` / `Link` / `Transform`); `.transform()` is stubbed for future instrumentation. + - `RecordMetadata` gains an optional `stage_profiling: Vec` field (feature-gated) attached automatically in `TypedRecord::collect_metadata`. New helper `RecordMetadata::with_stage_profiling`. + - `AimDb::reset_stage_profiling()` clears every record's counters. New `profiling.reset` AimX RPC method (write-permission gated) wired through `remote::handler`. + - New `RuntimeForProfiling` marker trait — blanket-implemented for every `R` when the feature is off, requires `aimdb_executor::TimeOps` when on. Surfaces on `AimDbBuilder::run` / `build` and `AimDb::build_with`. Public API is unchanged when the feature is disabled. + - New `Time::duration_as_nanos` accessor on the context (delegates to `TimeOps`). + - Dependency: `portable-atomic` (with the `fallback` + `critical-section` features enabled by the `profiling` feature) for 64-bit-atomic emulation on targets without native `AtomicU64` (e.g. `thumbv7em-none-eabihf`). - **Writer-exclusivity validation for `.link_from()` (Issue #89)**: `.source()`, `.transform()`, and `.link_from()` are now mutually exclusive on a single record — combining any two now panics at configuration time instead of silently racing on the buffer (last-writer-wins). The check fires from `LinkFromBuilder::finish()` (panic message includes the offending URL), with symmetric defense-in-depth checks added to `TypedRecord::set_producer_service`, `set_transform`, and `add_inbound_connector`. Multiple `.link_from()` calls on the same record (fan-in) remain permitted. - **`no_std` support for the full Transform API (Design 027)**: `.transform()` and `.transform_join()` are now available on `no_std + alloc` targets. Multi-input join fan-in is no longer hardcoded to `tokio::sync::mpsc`; it uses the runtime-agnostic `JoinFanInRuntime` traits from `aimdb-executor`, implemented by Tokio, Embassy, and WASM adapters. - **`JoinEventRx`** — type-erased trigger receiver passed to the `on_triggers` handler. Call `.recv().await` in a loop to consume `JoinTrigger` events from all input forwarders. diff --git a/aimdb-core/Cargo.toml b/aimdb-core/Cargo.toml index b3cb24d7..4aaaa2c9 100644 --- a/aimdb-core/Cargo.toml +++ b/aimdb-core/Cargo.toml @@ -36,6 +36,13 @@ tracing = ["dep:tracing"] # Works in both std and no_std environments defmt = ["dep:defmt"] # Embedded logging via probe (no_std) metrics = ["dep:metrics", "std"] # Requires std for aggregation +# Automatic stage profiling (.source()/.tap()/.link() timing). +# Independent of `metrics`; works in no_std (only needs heap + a runtime clock). +# `portable-atomic/critical-section` provides the 64-bit-atomic fallback on targets +# without native 64-bit atomics (e.g. thumbv7em); it's a no-op where native atomics +# exist, and embedded binaries already supply a `critical-section` impl. +profiling = ["alloc", "portable-atomic/fallback", "portable-atomic/critical-section"] + # Testing features test-utils = ["std"] diff --git a/aimdb-core/src/builder.rs b/aimdb-core/src/builder.rs index 2997a9cc..effb4979 100644 --- a/aimdb-core/src/builder.rs +++ b/aimdb-core/src/builder.rs @@ -589,6 +589,7 @@ where #[cfg(feature = "alloc")] record_key: record_key.as_str().to_string(), extensions: &self.extensions, + last_stage: None, }; f(&mut reg); @@ -675,7 +676,10 @@ where /// .run().await // Runs forever /// } /// ``` - pub async fn run(self) -> DbResult<()> { + pub async fn run(self) -> DbResult<()> + where + R: crate::RuntimeForProfiling, + { #[cfg(feature = "tracing")] tracing::info!("Building database and spawning background tasks..."); @@ -728,7 +732,10 @@ where /// # Returns /// `DbResult>` - The database instance #[cfg_attr(not(feature = "std"), allow(unused_mut))] - pub async fn build(self) -> DbResult> { + pub async fn build(self) -> DbResult> + where + R: crate::RuntimeForProfiling, + { use crate::DbError; // Validate all records @@ -836,9 +843,14 @@ where extensions: self.extensions, }); + #[cfg(feature = "profiling")] + let profiling_clock = crate::profiling::make_clock(runtime.clone()); + let db = Arc::new(AimDb { inner: inner.clone(), runtime: runtime.clone(), + #[cfg(feature = "profiling")] + profiling_clock, }); #[cfg(feature = "tracing")] @@ -1016,6 +1028,10 @@ pub struct AimDb { /// Runtime adapter with concrete type runtime: Arc, + + /// Shared wall clock for stage profiling, built from the runtime at `build()` time. + #[cfg(feature = "profiling")] + profiling_clock: crate::profiling::Clock, } impl Clone for AimDb { @@ -1023,6 +1039,8 @@ impl Clone for AimDb { Self { inner: self.inner.clone(), runtime: self.runtime.clone(), + #[cfg(feature = "profiling")] + profiling_clock: self.profiling_clock.clone(), } } } @@ -1050,8 +1068,17 @@ impl AimDb { &self.inner.extensions } + /// Shared wall clock used by stage profiling (nanoseconds since an arbitrary epoch). + #[cfg(feature = "profiling")] + pub(crate) fn profiling_clock(&self) -> &crate::profiling::Clock { + &self.profiling_clock + } + /// Builds a database with a closure-based builder pattern - pub async fn build_with(rt: Arc, f: impl FnOnce(&mut AimDbBuilder)) -> DbResult<()> { + pub async fn build_with(rt: Arc, f: impl FnOnce(&mut AimDbBuilder)) -> DbResult<()> + where + R: crate::RuntimeForProfiling, + { let mut b = AimDbBuilder::new().runtime(rt); f(&mut b); b.run().await @@ -1238,6 +1265,14 @@ impl AimDb { self.inner.list_records() } + /// Resets stage profiling counters for every record (feature `profiling`). + #[cfg(feature = "profiling")] + pub fn reset_stage_profiling(&self) { + for record in &self.inner.storages { + record.reset_profiling(); + } + } + /// Try to get record's latest value as JSON by name (std only) /// /// Convenience wrapper around `AimDbInner::try_latest_as_json()`. diff --git a/aimdb-core/src/context.rs b/aimdb-core/src/context.rs index 10860501..06106622 100644 --- a/aimdb-core/src/context.rs +++ b/aimdb-core/src/context.rs @@ -180,6 +180,11 @@ impl<'a, R: Runtime> Time<'a, R> { pub fn duration_since(&self, later: R::Instant, earlier: R::Instant) -> Option { self.ctx.runtime.duration_since(later, earlier) } + + /// Number of whole nanoseconds in a duration (runtime-agnostic). + pub fn duration_as_nanos(&self, duration: R::Duration) -> u64 { + self.ctx.runtime.duration_as_nanos(duration) + } } /// Log utilities accessor for RuntimeContext diff --git a/aimdb-core/src/lib.rs b/aimdb-core/src/lib.rs index 2d5cc7ce..919606a3 100644 --- a/aimdb-core/src/lib.rs +++ b/aimdb-core/src/lib.rs @@ -28,6 +28,8 @@ mod error; pub mod ext_macros; pub mod extensions; pub mod graph; +#[cfg(feature = "profiling")] +pub mod profiling; pub mod record_id; #[cfg(feature = "std")] pub mod remote; @@ -38,6 +40,25 @@ pub mod transport; pub mod typed_api; pub mod typed_record; +/// Marker trait used to add a `TimeOps` requirement to runtime-agnostic builder +/// methods *only* when the `profiling` feature is enabled. +/// +/// When `profiling` is off this is a blanket no-op (every `R` implements it), so +/// the public API is unchanged. When `profiling` is on it requires +/// [`aimdb_executor::TimeOps`], which every real runtime adapter already provides +/// (it is part of [`aimdb_executor::Runtime`]). +#[cfg(feature = "profiling")] +pub trait RuntimeForProfiling: aimdb_executor::TimeOps {} +#[cfg(feature = "profiling")] +impl RuntimeForProfiling for R {} + +/// See the `profiling`-enabled definition above. Blanket no-op when `profiling` +/// is disabled. +#[cfg(not(feature = "profiling"))] +pub trait RuntimeForProfiling {} +#[cfg(not(feature = "profiling"))] +impl RuntimeForProfiling for R {} + // Public API exports pub use context::RuntimeContext; pub use error::{DbError, DbResult}; @@ -58,9 +79,13 @@ pub use builder::OutboundRoute; pub use builder::{AimDb, AimDbBuilder}; pub use connector::ConnectorBuilder; pub use transport::{Connector, ConnectorConfig, PublishError}; -pub use typed_api::{Consumer, Producer, RecordRegistrar, RecordT}; +pub use typed_api::{Consumer, Producer, RecordRegistrar, RecordT, StageKind}; pub use typed_record::{AnyRecord, AnyRecordExt, TypedRecord}; +// Stage profiling exports (feature-gated) +#[cfg(feature = "profiling")] +pub use profiling::{RecordProfilingMetrics, StageMetrics, StageProfilingInfo}; + // Connector Infrastructure exports pub use connector::TopicResolverFn; pub use connector::{ConnectorClient, ConnectorLink, ConnectorUrl, SerializeError}; diff --git a/aimdb-core/src/profiling/info.rs b/aimdb-core/src/profiling/info.rs new file mode 100644 index 00000000..5b7bfade --- /dev/null +++ b/aimdb-core/src/profiling/info.rs @@ -0,0 +1,67 @@ +//! Serializable snapshot of stage profiling metrics, for remote introspection. + +extern crate alloc; +use alloc::{string::String, vec::Vec}; + +use serde::{Deserialize, Serialize}; + +use crate::profiling::{RecordProfilingMetrics, StageEntry}; + +/// A point-in-time snapshot of one execution stage's timing metrics. +/// +/// Carried in `RecordMetadata::stage_profiling` and exposed by the +/// `get_stage_profiling` MCP tool. All times are wall-clock nanoseconds. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct StageProfilingInfo { + /// Stage kind: `"source"`, `"tap"`, `"link"`, or `"transform"`. + pub stage_type: String, + /// Registration index within the stage kind (0-based). + pub index: usize, + /// Name assigned via `.with_name("...")`, if any. + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, + /// Number of recorded invocations. + pub call_count: u64, + /// Cumulative wall-clock time, nanoseconds. + pub total_time_ns: u64, + /// Mean wall-clock time per invocation, nanoseconds. + pub avg_time_ns: u64, + /// Fastest recorded invocation, nanoseconds. + pub min_time_ns: u64, + /// Slowest recorded invocation, nanoseconds. + pub max_time_ns: u64, +} + +impl StageProfilingInfo { + fn from_entry(stage_type: &str, index: usize, entry: &StageEntry) -> Self { + let m = &entry.metrics; + Self { + stage_type: String::from(stage_type), + index, + name: entry.name.clone(), + call_count: m.call_count(), + total_time_ns: m.total_time_ns(), + avg_time_ns: m.avg_time_ns(), + min_time_ns: m.min_time_ns(), + max_time_ns: m.max_time_ns(), + } + } +} + +impl RecordProfilingMetrics { + /// Returns a serializable snapshot of every registered stage's metrics, + /// ordered sources → taps → links. + pub fn snapshot(&self) -> Vec { + let mut out = Vec::new(); + for (kind, stages) in [ + ("source", self.sources()), + ("tap", self.taps()), + ("link", self.links()), + ] { + for (i, entry) in stages.iter().enumerate() { + out.push(StageProfilingInfo::from_entry(kind, i, entry)); + } + } + out + } +} diff --git a/aimdb-core/src/profiling/mod.rs b/aimdb-core/src/profiling/mod.rs new file mode 100644 index 00000000..ba3118bd --- /dev/null +++ b/aimdb-core/src/profiling/mod.rs @@ -0,0 +1,141 @@ +//! Automatic stage profiling (feature `profiling`). +//! +//! AimDB owns the execution boundary for user callbacks (`.source()`, `.tap()`, +//! `.link()`), so it can measure their wall-clock execution time without any +//! user instrumentation: +//! +//! * **source** — wall-clock interval between successive `Producer::produce()` calls. +//! * **tap / link** — wall-clock interval from a buffer read yielding a value to +//! the next read (≈ the user's per-value processing time). +//! +//! Timing uses the runtime's own clock ([`aimdb_executor::TimeOps`]), so the +//! feature works on `no_std` targets too (it only needs heap + a clock). +//! +//! Measured time is **wall-clock**, including `.await` points / I/O / sleeps — it +//! answers *which* stage is slow, not *why*. Use `tracing` / `tokio-console` for +//! CPU-vs-I/O analysis. + +mod info; +mod record_profiling; +mod stage_metrics; + +pub use info::StageProfilingInfo; +pub use record_profiling::{RecordProfilingMetrics, StageEntry}; +pub use stage_metrics::StageMetrics; + +extern crate alloc; +use alloc::{boxed::Box, sync::Arc}; +use core::future::Future; +use core::pin::Pin; +use core::sync::atomic::Ordering; +use portable_atomic::AtomicU64; + +use crate::buffer::BufferReader; +use crate::DbError; + +/// A monotonic-ish wall clock that returns nanoseconds since an arbitrary epoch. +/// +/// Built once per record at spawn time from the runtime adapter and shared +/// (cheaply, behind `Arc`) with the per-stage instrumentation. +pub(crate) type Clock = Arc u64 + Send + Sync>; + +/// Builds a [`Clock`] from a runtime adapter. +pub(crate) fn make_clock(rt: Arc) -> Clock { + let epoch = rt.now(); + Arc::new(move || { + let now = rt.now(); + match rt.duration_since(now, epoch.clone()) { + Some(d) => rt.duration_as_nanos(d), + None => 0, + } + }) +} + +/// Sentinel for "no previous `produce()` recorded yet". +const NO_PREV: u64 = u64::MAX; + +/// Per-`Producer` profiling state for a `.source()` stage. +/// +/// Shared (behind `Arc`) by all clones of the `Producer`, so the "time since the +/// last produce" cursor is consistent regardless of which clone produced. +pub(crate) struct ProducerProfilingState { + metrics: Arc, + clock: Clock, + last_produce_ns: AtomicU64, +} + +impl ProducerProfilingState { + pub(crate) fn new(metrics: Arc, clock: Clock) -> Self { + Self { + metrics, + clock, + last_produce_ns: AtomicU64::new(NO_PREV), + } + } + + /// Records one source iteration: the interval since the previous produce. + /// The first call only seeds the cursor (no sample). + pub(crate) fn record_produce(&self) { + let now = (self.clock)(); + let prev = self.last_produce_ns.swap(now, Ordering::Relaxed); + if prev != NO_PREV { + self.metrics.record(now.saturating_sub(prev)); + } + } +} + +/// Wraps a [`BufferReader`] to time how long the consumer spends between reads +/// (≈ per-value processing time for a `.tap()` / `.link()` stage). +pub(crate) struct ProfilingBufferReader { + inner: Box + Send>, + metrics: Arc, + clock: Clock, + /// Wall-clock (ns) at which the last value was handed to the consumer. + last_yield_ns: Option, +} + +impl ProfilingBufferReader { + pub(crate) fn new( + inner: Box + Send>, + metrics: Arc, + clock: Clock, + ) -> Self { + Self { + inner, + metrics, + clock, + last_yield_ns: None, + } + } + + fn on_yield(&mut self, started_ns: u64) { + if let Some(prev) = self.last_yield_ns { + self.metrics.record(started_ns.saturating_sub(prev)); + } + self.last_yield_ns = Some((self.clock)()); + } +} + +impl BufferReader for ProfilingBufferReader { + fn recv(&mut self) -> Pin> + Send + '_>> { + Box::pin(async move { + // `started_ns` ≈ the moment the consumer finished processing the + // previous value and asked for the next one. + let started_ns = (self.clock)(); + let result = self.inner.recv().await; + if result.is_ok() { + self.on_yield(started_ns); + } + result + }) + } + + fn try_recv(&mut self) -> Result { + let started_ns = (self.clock)(); + let result = self.inner.try_recv(); + if result.is_ok() { + self.on_yield(started_ns); + } + result + } +} diff --git a/aimdb-core/src/profiling/record_profiling.rs b/aimdb-core/src/profiling/record_profiling.rs new file mode 100644 index 00000000..014628ed --- /dev/null +++ b/aimdb-core/src/profiling/record_profiling.rs @@ -0,0 +1,178 @@ +//! Per-record container of stage profiling metrics. + +extern crate alloc; +use alloc::{string::String, sync::Arc, vec::Vec}; + +use crate::profiling::StageMetrics; +use crate::StageKind; + +/// One registered stage: its shared metrics plus an optional human-readable name +/// set via `.with_name("...")`. +#[derive(Debug)] +pub struct StageEntry { + /// Shared timing counters for this stage. + pub metrics: Arc, + /// Name assigned via `RecordRegistrar::with_name`, if any. + pub name: Option, +} + +impl StageEntry { + fn new() -> Self { + Self { + metrics: Arc::new(StageMetrics::new()), + name: None, + } + } +} + +/// All stage profiling metrics for a single record, indexed by registration order +/// within each stage kind (`sources[0]` is the first `.source()`, `taps[1]` the +/// second `.tap()`, etc.). +#[derive(Debug, Default)] +pub struct RecordProfilingMetrics { + sources: Vec, + taps: Vec, + links: Vec, + /// Reserved for `.transform()` instrumentation (not yet wired). + #[allow(dead_code)] + transforms: Vec, +} + +impl RecordProfilingMetrics { + /// Creates an empty container. + pub fn new() -> Self { + Self::default() + } + + /// `true` if no stages have been registered. + pub fn is_empty(&self) -> bool { + self.sources.is_empty() + && self.taps.is_empty() + && self.links.is_empty() + && self.transforms.is_empty() + } + + /// Registers a new source stage; returns its index and shared metrics handle. + pub fn push_source(&mut self) -> (usize, Arc) { + Self::push(&mut self.sources) + } + + /// Registers a new tap stage; returns its index and shared metrics handle. + pub fn push_tap(&mut self) -> (usize, Arc) { + Self::push(&mut self.taps) + } + + /// Registers a new link stage; returns its index and shared metrics handle. + pub fn push_link(&mut self) -> (usize, Arc) { + Self::push(&mut self.links) + } + + fn push(vec: &mut Vec) -> (usize, Arc) { + let idx = vec.len(); + let entry = StageEntry::new(); + let metrics = entry.metrics.clone(); + vec.push(entry); + (idx, metrics) + } + + /// The source stage at `idx`, if registered. + pub fn source(&self, idx: usize) -> Option<&StageEntry> { + self.sources.get(idx) + } + + /// The tap stage at `idx`, if registered. + pub fn tap(&self, idx: usize) -> Option<&StageEntry> { + self.taps.get(idx) + } + + /// The link stage at `idx`, if registered. + pub fn link(&self, idx: usize) -> Option<&StageEntry> { + self.links.get(idx) + } + + /// Number of registered tap stages. + pub fn tap_count(&self) -> usize { + self.taps.len() + } + + /// All source stages, in registration order. + pub fn sources(&self) -> &[StageEntry] { + &self.sources + } + + /// All tap stages, in registration order. + pub fn taps(&self) -> &[StageEntry] { + &self.taps + } + + /// All link stages, in registration order. + pub fn links(&self) -> &[StageEntry] { + &self.links + } + + /// Assigns a name to a previously registered stage. No-op if `idx` is out of range. + pub fn set_stage_name(&mut self, kind: StageKind, idx: usize, name: &str) { + let vec = match kind { + StageKind::Source => &mut self.sources, + StageKind::Tap => &mut self.taps, + StageKind::Link => &mut self.links, + StageKind::Transform => &mut self.transforms, + }; + if let Some(entry) = vec.get_mut(idx) { + entry.name = Some(String::from(name)); + } + } + + /// Resets every stage's counters. + pub fn reset_all(&self) { + for e in self + .sources + .iter() + .chain(&self.taps) + .chain(&self.links) + .chain(&self.transforms) + { + e.metrics.reset(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn push_assigns_sequential_indices() { + let mut m = RecordProfilingMetrics::new(); + assert!(m.is_empty()); + let (i0, _) = m.push_tap(); + let (i1, _) = m.push_tap(); + assert_eq!((i0, i1), (0, 1)); + assert_eq!(m.tap_count(), 2); + assert!(!m.is_empty()); + assert!(m.tap(0).is_some()); + assert!(m.tap(2).is_none()); + } + + #[test] + fn set_stage_name() { + let mut m = RecordProfilingMetrics::new(); + let _ = m.push_source(); + m.set_stage_name(StageKind::Source, 0, "sensor_reader"); + assert_eq!(m.source(0).unwrap().name.as_deref(), Some("sensor_reader")); + // out of range is a no-op + m.set_stage_name(StageKind::Source, 5, "ignored"); + } + + #[test] + fn reset_all_clears_metrics() { + let mut m = RecordProfilingMetrics::new(); + let (_, src) = m.push_source(); + let (_, tap) = m.push_tap(); + src.record(10); + tap.record(20); + m.reset_all(); + assert_eq!(src.call_count(), 0); + assert_eq!(tap.call_count(), 0); + } +} diff --git a/aimdb-core/src/profiling/stage_metrics.rs b/aimdb-core/src/profiling/stage_metrics.rs new file mode 100644 index 00000000..c6d5cc4d --- /dev/null +++ b/aimdb-core/src/profiling/stage_metrics.rs @@ -0,0 +1,168 @@ +//! Per-stage timing metrics. + +use core::sync::atomic::Ordering; +use portable_atomic::AtomicU64; + +/// Sentinel used for `min_time_ns` before any sample has been recorded. +const NO_MIN: u64 = u64::MAX; + +/// Cumulative timing statistics for a single execution stage +/// (one `.source()`, `.tap()`, `.link()`, or future `.transform()`). +/// +/// All times are wall-clock nanoseconds. Updated with `Ordering::Relaxed` — these +/// are diagnostics, not synchronization primitives. Multiple producers may call +/// [`record`](Self::record) concurrently (e.g. cloned `Producer` handles). +#[derive(Debug)] +pub struct StageMetrics { + call_count: AtomicU64, + total_time_ns: AtomicU64, + min_time_ns: AtomicU64, + max_time_ns: AtomicU64, +} + +impl Default for StageMetrics { + fn default() -> Self { + Self { + call_count: AtomicU64::new(0), + total_time_ns: AtomicU64::new(0), + min_time_ns: AtomicU64::new(NO_MIN), + max_time_ns: AtomicU64::new(0), + } + } +} + +impl StageMetrics { + /// Creates an empty `StageMetrics`. + pub fn new() -> Self { + Self::default() + } + + /// Records one stage invocation that took `nanos` nanoseconds of wall-clock time. + pub fn record(&self, nanos: u64) { + self.call_count.fetch_add(1, Ordering::Relaxed); + self.total_time_ns.fetch_add(nanos, Ordering::Relaxed); + self.min_time_ns.fetch_min(nanos, Ordering::Relaxed); + self.max_time_ns.fetch_max(nanos, Ordering::Relaxed); + } + + /// Number of recorded invocations. + pub fn call_count(&self) -> u64 { + self.call_count.load(Ordering::Relaxed) + } + + /// Cumulative wall-clock time across all invocations, in nanoseconds. + pub fn total_time_ns(&self) -> u64 { + self.total_time_ns.load(Ordering::Relaxed) + } + + /// Mean wall-clock time per invocation, in nanoseconds (0 if no samples). + pub fn avg_time_ns(&self) -> u64 { + let count = self.call_count(); + self.total_time_ns().checked_div(count).unwrap_or(0) + } + + /// Fastest recorded invocation, in nanoseconds (0 if no samples). + pub fn min_time_ns(&self) -> u64 { + match self.min_time_ns.load(Ordering::Relaxed) { + NO_MIN => 0, + v => v, + } + } + + /// Slowest recorded invocation, in nanoseconds (0 if no samples). + pub fn max_time_ns(&self) -> u64 { + self.max_time_ns.load(Ordering::Relaxed) + } + + /// Clears all counters back to their initial state. + pub fn reset(&self) { + self.call_count.store(0, Ordering::Relaxed); + self.total_time_ns.store(0, Ordering::Relaxed); + self.min_time_ns.store(NO_MIN, Ordering::Relaxed); + self.max_time_ns.store(0, Ordering::Relaxed); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn zero_call_accessors_are_zero() { + let m = StageMetrics::new(); + assert_eq!(m.call_count(), 0); + assert_eq!(m.total_time_ns(), 0); + assert_eq!(m.avg_time_ns(), 0); + assert_eq!(m.min_time_ns(), 0); + assert_eq!(m.max_time_ns(), 0); + } + + #[test] + fn record_one() { + let m = StageMetrics::new(); + m.record(42); + assert_eq!(m.call_count(), 1); + assert_eq!(m.total_time_ns(), 42); + assert_eq!(m.avg_time_ns(), 42); + assert_eq!(m.min_time_ns(), 42); + assert_eq!(m.max_time_ns(), 42); + } + + #[test] + fn record_many_avg_min_max() { + let m = StageMetrics::new(); + for v in [10u64, 30, 20] { + m.record(v); + } + assert_eq!(m.call_count(), 3); + assert_eq!(m.total_time_ns(), 60); + assert_eq!(m.avg_time_ns(), 20); + assert_eq!(m.min_time_ns(), 10); + assert_eq!(m.max_time_ns(), 30); + } + + #[test] + fn reset_clears_everything() { + let m = StageMetrics::new(); + m.record(5); + m.record(7); + m.reset(); + assert_eq!(m.call_count(), 0); + assert_eq!(m.total_time_ns(), 0); + assert_eq!(m.avg_time_ns(), 0); + assert_eq!(m.min_time_ns(), 0); + assert_eq!(m.max_time_ns(), 0); + // and still usable afterwards + m.record(3); + assert_eq!(m.call_count(), 1); + assert_eq!(m.min_time_ns(), 3); + } + + #[cfg(feature = "std")] + #[test] + fn concurrent_record() { + use std::sync::Arc; + use std::thread; + + let m = Arc::new(StageMetrics::new()); + let threads = 8; + let per_thread = 1000u64; + let handles: Vec<_> = (0..threads) + .map(|_| { + let m = Arc::clone(&m); + thread::spawn(move || { + for _ in 0..per_thread { + m.record(2); + } + }) + }) + .collect(); + for h in handles { + h.join().unwrap(); + } + assert_eq!(m.call_count(), threads as u64 * per_thread); + assert_eq!(m.total_time_ns(), threads as u64 * per_thread * 2); + assert_eq!(m.min_time_ns(), 2); + assert_eq!(m.max_time_ns(), 2); + } +} diff --git a/aimdb-core/src/remote/handler.rs b/aimdb-core/src/remote/handler.rs index 0fdcb28e..cad93fac 100644 --- a/aimdb-core/src/remote/handler.rs +++ b/aimdb-core/src/remote/handler.rs @@ -579,6 +579,8 @@ where "graph.nodes" => handle_graph_nodes(db, request.id).await, "graph.edges" => handle_graph_edges(db, request.id).await, "graph.topo_order" => handle_graph_topo_order(db, request.id).await, + #[cfg(feature = "profiling")] + "profiling.reset" => handle_profiling_reset(db, config, request.id).await, _ => { #[cfg(feature = "tracing")] tracing::warn!("Unknown method: {}", request.method); @@ -625,6 +627,37 @@ where Response::success(request_id, json!(records)) } +/// Handles profiling.reset method +/// +/// Clears stage profiling counters for every record. Requires write permission. +#[cfg(all(feature = "std", feature = "profiling"))] +async fn handle_profiling_reset( + db: &Arc>, + config: &AimxConfig, + request_id: u64, +) -> Response +where + R: crate::RuntimeAdapter + crate::Spawn + 'static, +{ + if matches!( + config.security_policy, + crate::remote::SecurityPolicy::ReadOnly + ) { + return Response::error( + request_id, + "permission_denied", + "profiling.reset requires write permission (ReadOnly security policy)".to_string(), + ); + } + + db.reset_stage_profiling(); + + #[cfg(feature = "tracing")] + tracing::info!("Stage profiling counters reset"); + + Response::success(request_id, json!({ "reset": true })) +} + /// Handles record.get method /// /// Returns the current value of a record as JSON. diff --git a/aimdb-core/src/remote/metadata.rs b/aimdb-core/src/remote/metadata.rs index fb08950e..3870464c 100644 --- a/aimdb-core/src/remote/metadata.rs +++ b/aimdb-core/src/remote/metadata.rs @@ -77,6 +77,13 @@ pub struct RecordMetadata { #[cfg(feature = "metrics")] #[serde(skip_serializing_if = "Option::is_none")] pub occupancy: Option<(usize, usize)>, + + // ===== Stage profiling (feature-gated) ===== + /// Per-stage timing metrics (`.source()`/`.tap()`/`.link()`), if the + /// `profiling` feature is enabled and any stage has been registered. + #[cfg(feature = "profiling")] + #[serde(skip_serializing_if = "Option::is_none")] + pub stage_profiling: Option>, } impl RecordMetadata { @@ -132,6 +139,8 @@ impl RecordMetadata { dropped_count: None, #[cfg(feature = "metrics")] occupancy: None, + #[cfg(feature = "profiling")] + stage_profiling: None, } } @@ -162,6 +171,18 @@ impl RecordMetadata { } self } + + /// Attaches a stage profiling snapshot (profiling feature only). + #[cfg(feature = "profiling")] + pub fn with_stage_profiling( + mut self, + stages: std::vec::Vec, + ) -> Self { + if !stages.is_empty() { + self.stage_profiling = Some(stages); + } + self + } } #[cfg(test)] diff --git a/aimdb-core/src/typed_api.rs b/aimdb-core/src/typed_api.rs index 7d3ae438..2d0fda96 100644 --- a/aimdb-core/src/typed_api.rs +++ b/aimdb-core/src/typed_api.rs @@ -93,6 +93,9 @@ pub struct Producer { db: Arc>, /// Record key for key-based routing (required - all records have keys) record_key: String, + /// Stage profiling state (set by the spawn machinery for `.source()` stages). + #[cfg(feature = "profiling")] + profiling: Option>, /// Phantom data to bind the type parameter T _phantom: PhantomData, } @@ -107,10 +110,24 @@ where Self { db, record_key: key, + #[cfg(feature = "profiling")] + profiling: None, _phantom: PhantomData, } } + /// Attaches stage profiling state. Internal — called by the spawn machinery. + #[cfg(feature = "profiling")] + pub(crate) fn set_profiling( + &mut self, + metrics: Arc, + clock: crate::profiling::Clock, + ) { + self.profiling = Some(Arc::new(crate::profiling::ProducerProfilingState::new( + metrics, clock, + ))); + } + /// Produce a value of type T /// /// This triggers the entire pipeline for this record type: @@ -118,6 +135,10 @@ where /// 2. All link connectors are triggered /// 3. Buffers are updated (if configured) pub async fn produce(&self, value: T) -> DbResult<()> { + #[cfg(feature = "profiling")] + if let Some(state) = &self.profiling { + state.record_produce(); + } self.db.produce::(&self.record_key, value).await } @@ -135,6 +156,8 @@ where Self { db: self.db.clone(), record_key: self.record_key.clone(), + #[cfg(feature = "profiling")] + profiling: self.profiling.clone(), _phantom: PhantomData, } } @@ -197,6 +220,9 @@ pub struct Consumer { db: Arc>, /// Record key for key-based routing (required - all records have keys) record_key: String, + /// Stage profiling state (set by the spawn machinery for `.tap()` / `.link()`). + #[cfg(feature = "profiling")] + profiling: Option<(Arc, crate::profiling::Clock)>, /// Phantom data to bind the type parameter T _phantom: PhantomData, } @@ -211,15 +237,36 @@ where Self { db, record_key: key, + #[cfg(feature = "profiling")] + profiling: None, _phantom: PhantomData, } } + /// Attaches stage profiling state. Internal — called by the spawn machinery. + #[cfg(feature = "profiling")] + pub(crate) fn set_profiling( + &mut self, + metrics: Arc, + clock: crate::profiling::Clock, + ) { + self.profiling = Some((metrics, clock)); + } + /// Subscribe to updates for this record type /// /// Returns a reader that yields values when they are produced. pub fn subscribe(&self) -> DbResult + Send>> { - self.db.subscribe::(&self.record_key) + let reader = self.db.subscribe::(&self.record_key)?; + #[cfg(feature = "profiling")] + if let Some((metrics, clock)) = &self.profiling { + return Ok(Box::new(crate::profiling::ProfilingBufferReader::new( + reader, + metrics.clone(), + clock.clone(), + ))); + } + Ok(reader) } /// Returns the key this consumer is bound to @@ -284,6 +331,21 @@ where type TypedSerializerFn = Arc Result, crate::connector::SerializeError> + Send + Sync + 'static>; +/// Kind of execution stage, used to address per-stage profiling metrics and to +/// remember which stage `RecordRegistrar::with_name` should rename. +#[doc(hidden)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum StageKind { + /// A `.source()` producer callback. + Source, + /// A `.tap()` observer callback. + Tap, + /// An outbound `.link_to()` connector. + Link, + /// A `.transform()` callback (reserved; not yet instrumented). + Transform, +} + /// Registrar for configuring a typed record /// /// Provides a fluent API for registering producer and consumer functions. @@ -301,6 +363,10 @@ pub struct RecordRegistrar< /// Extension storage from the builder — allows external crates (e.g. /// `aimdb-persistence`) to retrieve typed state inside `.persist()`. pub(crate) extensions: &'a crate::extensions::Extensions, + /// The most recently registered stage, so `.with_name()` knows what to name. + /// Tracked even when the `profiling` feature is off (then it's just unused). + #[cfg_attr(not(feature = "profiling"), allow(dead_code))] + pub(crate) last_stage: Option<(StageKind, usize)>, } impl<'a, T, R> RecordRegistrar<'a, T, R> @@ -317,6 +383,26 @@ where self.extensions } + /// Assigns a human-readable name to the stage registered immediately before + /// this call (the most recent `.source()`, `.tap()`, or `.link_to()`). + /// + /// The name shows up in stage profiling output. This method is always + /// available; when the `profiling` feature is disabled it is a no-op. + /// + /// ```rust,ignore + /// reg.source(|ctx, producer| async move { /* ... */ }) + /// .with_name("sensor_reader"); + /// ``` + pub fn with_name(&'a mut self, name: &str) -> &'a mut Self { + #[cfg(feature = "profiling")] + if let Some((kind, idx)) = self.last_stage { + self.rec.profiling_mut().set_stage_name(kind, idx, name); + } + #[cfg(not(feature = "profiling"))] + let _ = name; + self + } + /// Registers a producer service for this record type (low-level API) /// /// **Note:** This is the foundational API used by runtime adapter implementations. @@ -337,6 +423,15 @@ where Fut: Future + Send + 'static, { self.rec.set_producer_service(f); + #[cfg(feature = "profiling")] + { + let (idx, _) = self.rec.profiling_mut().push_source(); + self.last_stage = Some((StageKind::Source, idx)); + } + #[cfg(not(feature = "profiling"))] + { + self.last_stage = Some((StageKind::Source, 0)); + } self } @@ -360,6 +455,15 @@ where T: Sync, { self.rec.add_consumer(f); + #[cfg(feature = "profiling")] + { + let (idx, _) = self.rec.profiling_mut().push_tap(); + self.last_stage = Some((StageKind::Tap, idx)); + } + #[cfg(not(feature = "profiling"))] + { + self.last_stage = Some((StageKind::Tap, 0)); + } self } @@ -450,6 +554,9 @@ where let pipeline = build_fn(builder); let descriptor = pipeline.into_descriptor(); self.rec.set_transform(descriptor); + // Transform stages are not yet instrumented; track the kind so `.with_name()` + // remains coherent (it currently has nowhere to store the name). + self.last_stage = Some((StageKind::Transform, 0)); self } @@ -466,6 +573,7 @@ where let pipeline = build_fn(builder); let descriptor = pipeline.into_descriptor(); self.rec.set_transform(descriptor); + self.last_stage = Some((StageKind::Transform, 0)); self } @@ -741,6 +849,19 @@ where ); } + // Register the link as a profiling stage (so `.with_name()` can name it + // and the consumer it creates can be timed). + #[cfg(feature = "profiling")] + let link_metrics = { + let (idx, metrics) = self.registrar.rec.profiling_mut().push_link(); + self.registrar.last_stage = Some((StageKind::Link, idx)); + metrics + }; + #[cfg(not(feature = "profiling"))] + { + self.registrar.last_stage = Some((StageKind::Link, 0)); + } + // Store consumer factory that captures type T and record key // This allows the connector to subscribe to values without knowing T at compile time { @@ -754,8 +875,11 @@ where let db = Arc::new(db_ref.clone()); // Create Consumer with captured type T and record key - Box::new(Consumer::::new(db, record_key.clone())) - as Box + #[allow(unused_mut)] + let mut consumer = Consumer::::new(db.clone(), record_key.clone()); + #[cfg(feature = "profiling")] + consumer.set_profiling(link_metrics.clone(), db.profiling_clock().clone()); + Box::new(consumer) as Box }, )); } @@ -1096,6 +1220,9 @@ mod tests { fn sleep(&self, _duration: u64) -> impl Future + Send { core::future::ready(()) } + fn duration_as_nanos(&self, duration: u64) -> u64 { + duration + } } impl aimdb_executor::Logger for MockRuntime { @@ -1148,6 +1275,7 @@ mod tests { connector_builders: builders, record_key: "test::Record".to_string(), extensions, + last_stage: None, } } diff --git a/aimdb-core/src/typed_record.rs b/aimdb-core/src/typed_record.rs index 82375776..7f0135ed 100644 --- a/aimdb-core/src/typed_record.rs +++ b/aimdb-core/src/typed_record.rs @@ -398,6 +398,12 @@ pub trait AnyRecord: Send + Sync { /// Get the inbound connector links for this record fn inbound_connectors(&self) -> &[crate::connector::InboundConnectorLink]; + + /// Resets this record's stage profiling counters (feature `profiling`). + /// + /// Default implementation is a no-op; `TypedRecord` overrides it. + #[cfg(feature = "profiling")] + fn reset_profiling(&self) {} } // Helper extension trait for type-safe downcasting @@ -555,6 +561,12 @@ pub struct TypedRecord, + /// Per-stage profiling metrics (feature `profiling`). + /// Stages are appended here in the same order they are registered on the + /// `RecordRegistrar`, which matches the order the spawn machinery iterates them. + #[cfg(feature = "profiling")] + profiling: crate::profiling::RecordProfilingMetrics, + /// Metadata tracking (std only - for remote access) #[cfg(feature = "std")] metadata: RecordMetadataTracker, @@ -605,6 +617,8 @@ impl Type buffer_cfg: None, outbound_connectors: Vec::new(), inbound_connectors: Vec::new(), + #[cfg(feature = "profiling")] + profiling: crate::profiling::RecordProfilingMetrics::new(), #[cfg(feature = "std")] metadata: RecordMetadataTracker::new::(), #[cfg(feature = "std")] @@ -618,6 +632,19 @@ impl Type } } + /// Stage profiling metrics for this record (feature `profiling`). + #[cfg(feature = "profiling")] + pub fn profiling(&self) -> &crate::profiling::RecordProfilingMetrics { + &self.profiling + } + + /// Mutable access to the stage profiling metrics — used during registration + /// to append per-stage entries and assign names. + #[cfg(feature = "profiling")] + pub(crate) fn profiling_mut(&mut self) -> &mut crate::profiling::RecordProfilingMetrics { + &mut self.profiling + } + /// Sets the producer service for this record /// /// Long-running task that generates data via `producer.produce()`. Auto-spawned during `build()`. @@ -1154,10 +1181,22 @@ impl Type #[cfg(feature = "tracing")] let count = consumers.len(); + // Invariant: taps are pushed to `profiling` in the same order consumers are + // added (both happen in `RecordRegistrar::tap_raw`), so index `i` lines up. + #[cfg(feature = "profiling")] + debug_assert_eq!(self.profiling.tap_count(), consumers.len()); + // Spawn each consumer as a background task - for consumer_fn in consumers { + #[cfg_attr(not(feature = "profiling"), allow(unused_variables))] + for (i, consumer_fn) in consumers.into_iter().enumerate() { // Create a Consumer handle bound to the specific record key - let consumer = crate::typed_api::Consumer::new(db.clone(), record_key.to_string()); + #[allow(unused_mut)] + let mut consumer = crate::typed_api::Consumer::new(db.clone(), record_key.to_string()); + + #[cfg(feature = "profiling")] + if let Some(entry) = self.profiling.tap(i) { + consumer.set_profiling(entry.metrics.clone(), db.profiling_clock().clone()); + } // Get runtime context as Arc by cloning the Arc let runtime_any: Arc = runtime.clone(); @@ -1212,7 +1251,14 @@ impl Type ); // Create Producer bound to the specific record key - let producer = crate::typed_api::Producer::new(db.clone(), record_key.to_string()); + #[allow(unused_mut)] + let mut producer = crate::typed_api::Producer::new(db.clone(), record_key.to_string()); + + #[cfg(feature = "profiling")] + if let Some(entry) = self.profiling.source(0) { + producer.set_profiling(entry.metrics.clone(), db.profiling_clock().clone()); + } + let ctx: Arc = runtime.clone(); // Call the service function to get the future @@ -1482,6 +1528,10 @@ impl &[crate::connector::InboundConnectorLink] { &self.inbound_connectors } + + #[cfg(feature = "profiling")] + fn reset_profiling(&self) { + self.profiling.reset_all(); + } } diff --git a/aimdb-embassy-adapter/CHANGELOG.md b/aimdb-embassy-adapter/CHANGELOG.md index 8cda9f25..cb856c4a 100644 --- a/aimdb-embassy-adapter/CHANGELOG.md +++ b/aimdb-embassy-adapter/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **`profiling` feature** (Issue #58): Forwards to `aimdb-core/profiling` and enables the Embassy runtime clock for stage timing. Pulls `portable-atomic` with `fallback` + `critical-section` (via `aimdb-core/profiling`) to emulate 64-bit atomics on targets without native `AtomicU64` (e.g. `thumbv7em-none-eabihf`). The final binary must provide a `critical-section` implementation — cortex-m and Embassy HALs already do. +- **`TimeOps::duration_as_nanos` implementation**: Computes `duration.as_micros() * 1_000` (saturating). Microsecond resolution is the portable lower bound across Embassy tick-rate configurations. +- **Cross-compile check in `make test-embedded`**: `cargo check --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime,profiling"` to guard the no_std + profiling path. - **`EmbassyJoinQueue` (Design 027)**: Embassy implementation of the `JoinFanInRuntime` traits from `aimdb-executor`, backed by `embassy_sync::channel::Channel`. The channel is `Box::leak`ed at queue creation (once per join transform at DB startup) to obtain the `&'static` lifetime Embassy channels require. Embassy channels never close — the trigger loop runs for the device lifetime. - **`SpmcRing` subscriber-slot exhaustion diagnostics**: `defmt::error!` now fires when a `.subscribe()` call fails because the const-generic `SUBS` slot count is exhausted. Includes guidance to count one slot per `.link_to()` plus one per `transform_join` input. - **Improved `buffer_sized` doc**: explicit rules for counting `CONSUMERS` (one per `.tap()`, `.link_to()`, and `transform_join` input). diff --git a/aimdb-embassy-adapter/Cargo.toml b/aimdb-embassy-adapter/Cargo.toml index bdd18844..c2ff65d0 100644 --- a/aimdb-embassy-adapter/Cargo.toml +++ b/aimdb-embassy-adapter/Cargo.toml @@ -38,6 +38,12 @@ std = [] # This feature exists only to silence cfg warnings and for API consistency. metrics = [] +# Automatic stage profiling. Uses portable-atomic (with the `critical-section` +# fallback for 64-bit atomics, pulled in via aimdb-core/profiling) plus the Embassy +# clock, so it works on no_std targets. The final binary must provide a +# `critical-section` implementation (cortex-m / embassy HALs do). +profiling = ["aimdb-core/profiling", "embassy-runtime"] + [dependencies] # Executor traits aimdb-executor = { version = "0.1.0", path = "../aimdb-executor", default-features = false } diff --git a/aimdb-embassy-adapter/src/runtime.rs b/aimdb-embassy-adapter/src/runtime.rs index deca042f..90921b93 100644 --- a/aimdb-embassy-adapter/src/runtime.rs +++ b/aimdb-embassy-adapter/src/runtime.rs @@ -297,6 +297,12 @@ impl aimdb_executor::TimeOps for EmbassyAdapter { fn sleep(&self, duration: Self::Duration) -> impl core::future::Future + Send { embassy_time::Timer::after(duration) } + + fn duration_as_nanos(&self, duration: Self::Duration) -> u64 { + // `embassy_time::Duration` resolution depends on the configured tick rate; + // microsecond granularity is the portable lower bound. + duration.as_micros().saturating_mul(1_000) + } } // Implement Logger trait diff --git a/aimdb-executor/CHANGELOG.md b/aimdb-executor/CHANGELOG.md index 28a80f15..dbcc4d8a 100644 --- a/aimdb-executor/CHANGELOG.md +++ b/aimdb-executor/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **`TimeOps::duration_as_nanos`** — new required method on the `TimeOps` trait that returns the number of whole nanoseconds in a `Self::Duration`. Introduced for stage profiling (Issue #58) so features can convert elapsed time to a numeric, runtime-agnostic representation without binding to `std::time`. Implementations should saturate rather than overflow for durations larger than `u64::MAX` nanoseconds. - **Join fan-in traits (Design 027)**: Runtime-agnostic abstraction for multi-input transform fan-in queues, replacing the previous `tokio::sync::mpsc`-only path in `aimdb-core`. - `JoinFanInRuntime` trait — runtime capability for creating bounded fan-in queues; implemented per adapter. - `JoinQueue` trait — splittable into `Sender` / `Receiver` halves. diff --git a/aimdb-executor/src/lib.rs b/aimdb-executor/src/lib.rs index ba888385..bbb22ba1 100644 --- a/aimdb-executor/src/lib.rs +++ b/aimdb-executor/src/lib.rs @@ -88,6 +88,13 @@ pub trait TimeOps: RuntimeAdapter { fn secs(&self, secs: u64) -> Self::Duration; fn micros(&self, micros: u64) -> Self::Duration; fn sleep(&self, duration: Self::Duration) -> impl Future + Send; + + /// Returns the number of whole nanoseconds in `duration`. + /// + /// Used by features (e.g. stage profiling) that need a numeric, runtime-agnostic + /// representation of an elapsed [`Self::Duration`]. Implementations should saturate + /// rather than overflow for durations larger than `u64::MAX` nanoseconds. + fn duration_as_nanos(&self, duration: Self::Duration) -> u64; } /// Logging trait - enables ctx.log() accessor diff --git a/aimdb-tokio-adapter/CHANGELOG.md b/aimdb-tokio-adapter/CHANGELOG.md index d271026a..5e3b5949 100644 --- a/aimdb-tokio-adapter/CHANGELOG.md +++ b/aimdb-tokio-adapter/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **`profiling` feature** (Issue #58): Forwards to `aimdb-core/profiling`. Enables automatic per-stage wall-clock timing for `.source()`, `.tap()`, and `.link()` on the Tokio runtime. Off by default; zero overhead when disabled. +- **`TimeOps::duration_as_nanos` implementation**: Returns `Duration::as_nanos()` saturated to `u64::MAX`. Required by the new `aimdb-executor` trait method. +- **Stage profiling integration test** (`tests/stage_profiling.rs`, gated on `--features profiling`): Drives a record with a periodic `.source()` and a slow `.tap()`, then asserts that call counts are recorded, the `min ≤ avg ≤ max` invariant holds, `.with_name(...)` is reflected on the registered stages, and `reset_all()` clears the counters. - **`TokioJoinQueue` (Design 027)**: Tokio implementation of the `JoinFanInRuntime` traits from `aimdb-executor`, backed by `tokio::sync::mpsc::channel` with internal capacity 64. Enables `transform_join` on the Tokio runtime through the new runtime-agnostic abstraction. - **`transform_join` integration tests** (`tests/transform_join_integration_tests.rs`): two-input sum scenario plus a backpressure stress test that pushes 200 events through a yielding handler to verify the bounded fan-in does not deadlock. diff --git a/aimdb-tokio-adapter/Cargo.toml b/aimdb-tokio-adapter/Cargo.toml index 79881b11..e7714f91 100644 --- a/aimdb-tokio-adapter/Cargo.toml +++ b/aimdb-tokio-adapter/Cargo.toml @@ -23,6 +23,7 @@ tokio-runtime = ["tokio", "std"] # Observability features tracing = ["aimdb-core/tracing", "dep:tracing"] metrics = ["aimdb-core/metrics", "tokio-runtime"] +profiling = ["aimdb-core/profiling", "tokio-runtime"] # Testing features test-utils = ["aimdb-core/test-utils", "tokio-runtime"] diff --git a/aimdb-tokio-adapter/src/runtime.rs b/aimdb-tokio-adapter/src/runtime.rs index 7f672222..f3861864 100644 --- a/aimdb-tokio-adapter/src/runtime.rs +++ b/aimdb-tokio-adapter/src/runtime.rs @@ -190,6 +190,10 @@ impl TimeOps for TokioAdapter { fn sleep(&self, duration: Self::Duration) -> impl Future + Send { tokio::time::sleep(duration) } + + fn duration_as_nanos(&self, duration: Self::Duration) -> u64 { + duration.as_nanos().min(u64::MAX as u128) as u64 + } } #[cfg(feature = "tokio-runtime")] diff --git a/aimdb-tokio-adapter/tests/stage_profiling.rs b/aimdb-tokio-adapter/tests/stage_profiling.rs new file mode 100644 index 00000000..e371781a --- /dev/null +++ b/aimdb-tokio-adapter/tests/stage_profiling.rs @@ -0,0 +1,120 @@ +//! Integration tests for automatic stage profiling (`profiling` feature). +//! +//! Registers a record with a periodic `.source()` and a slow `.tap()`, runs the +//! pipeline briefly, then inspects the per-stage `StageMetrics` collected on the +//! `TypedRecord` — verifying call counts, the `min <= avg <= max` invariant, and +//! that `.with_name(...)` is recorded. +//! +//! The whole module compiles to nothing without `--features profiling`. + +#![cfg(feature = "profiling")] + +use aimdb_core::buffer::BufferCfg; +use aimdb_core::AimDbBuilder; +use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt}; +use std::sync::Arc; +use std::time::Duration; + +#[derive(Debug, Clone)] +struct Reading { + value: u64, +} + +const KEY: &str = "profiling::Reading"; + +#[tokio::test(flavor = "multi_thread")] +async fn source_and_tap_stages_are_timed_and_named() { + let adapter = Arc::new(TokioAdapter); + let mut builder = AimDbBuilder::new().runtime(adapter); + + builder.configure::(KEY, |reg| { + reg.buffer(BufferCfg::SpmcRing { capacity: 32 }) + .source(|ctx, producer| async move { + let mut n = 0u64; + loop { + let _ = producer.produce(Reading { value: n }).await; + n += 1; + ctx.time().sleep(ctx.time().millis(10)).await; + } + }) + .with_name("sensor_reader") + .tap(|ctx, consumer| async move { + let mut reader = consumer.subscribe().expect("subscribe"); + while let Ok(reading) = reader.recv().await { + // Simulate per-value processing work (wall-clock). + let _ = reading.value; + ctx.time().sleep(ctx.time().millis(5)).await; + } + }) + .with_name("data_processor"); + }); + + let db = builder.build().await.expect("build"); + + // Let the pipeline run for a bunch of iterations. + tokio::time::sleep(Duration::from_millis(250)).await; + + let rec = db + .inner() + .get_typed_record_by_key::(KEY) + .expect("typed record"); + let prof = rec.profiling(); + + // Source stage. + let source = prof.source(0).expect("source stage registered"); + assert_eq!(source.name.as_deref(), Some("sensor_reader")); + let s = &source.metrics; + assert!( + s.call_count() >= 1, + "expected at least one source interval recorded, got {}", + s.call_count() + ); + assert!(s.min_time_ns() <= s.avg_time_ns()); + assert!(s.avg_time_ns() <= s.max_time_ns()); + assert_eq!(s.avg_time_ns(), s.total_time_ns() / s.call_count()); + + // Tap stage. + let tap = prof.tap(0).expect("tap stage registered"); + assert_eq!(tap.name.as_deref(), Some("data_processor")); + let t = &tap.metrics; + assert!( + t.call_count() >= 1, + "expected at least one tap invocation recorded, got {}", + t.call_count() + ); + assert!(t.min_time_ns() <= t.avg_time_ns()); + assert!(t.avg_time_ns() <= t.max_time_ns()); + + // No link stage was registered. + assert!(prof.link(0).is_none()); + assert!(prof.links().is_empty()); + + // Reset clears everything. + prof.reset_all(); + assert_eq!(prof.source(0).unwrap().metrics.call_count(), 0); + assert_eq!(prof.tap(0).unwrap().metrics.call_count(), 0); +} + +#[tokio::test(flavor = "multi_thread")] +async fn with_name_is_a_no_op_friendly_builder() { + // `.with_name()` must always be chainable even on a stage that is never run. + let adapter = Arc::new(TokioAdapter); + let mut builder = AimDbBuilder::new().runtime(adapter); + builder.configure::("profiling::Unused", |reg| { + reg.buffer(BufferCfg::SingleLatest) + .tap(|_ctx, consumer| async move { + let mut reader = consumer.subscribe().expect("subscribe"); + let _ = reader.recv().await; + }) + .with_name("idle_tap"); + }); + let db = builder.build().await.expect("build"); + let rec = db + .inner() + .get_typed_record_by_key::("profiling::Unused") + .expect("typed record"); + assert_eq!( + rec.profiling().tap(0).unwrap().name.as_deref(), + Some("idle_tap") + ); +} diff --git a/aimdb-wasm-adapter/CHANGELOG.md b/aimdb-wasm-adapter/CHANGELOG.md index ae0c50dd..cf1433bd 100644 --- a/aimdb-wasm-adapter/CHANGELOG.md +++ b/aimdb-wasm-adapter/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **`TimeOps::duration_as_nanos` implementation**: Converts `WasmDuration` (milliseconds as `f64`) to nanoseconds, clamped to `[0, u64::MAX]`. Required by the new `aimdb-executor` trait method (used by stage profiling, Issue #58). - **`WasmJoinQueue` (Design 027)**: WASM implementation of the `JoinFanInRuntime` traits from `aimdb-executor`, backed by `futures_channel::mpsc::channel` with internal capacity 64. Enables `transform_join` on the WASM runtime. - **`transform_join` integration test** (`tests/transform_join_integration_tests.rs`, `wasm-bindgen-test`): two-input sum scenario verifying outputs are produced once both inputs have been seen. diff --git a/aimdb-wasm-adapter/src/time.rs b/aimdb-wasm-adapter/src/time.rs index 895ba8de..70dc2ba3 100644 --- a/aimdb-wasm-adapter/src/time.rs +++ b/aimdb-wasm-adapter/src/time.rs @@ -174,4 +174,14 @@ impl TimeOps for WasmAdapter { SendFuture(core::future::ready(())) } } + + fn duration_as_nanos(&self, duration: WasmDuration) -> u64 { + // WasmDuration is milliseconds as f64. + let nanos = duration.0.max(0.0) * 1_000_000.0; + if nanos >= u64::MAX as f64 { + u64::MAX + } else { + nanos as u64 + } + } } diff --git a/docs/design/014-M6-stage-profiling.md b/docs/design/014-M6-stage-profiling.md index 0b2fe65f..1a8ec649 100644 --- a/docs/design/014-M6-stage-profiling.md +++ b/docs/design/014-M6-stage-profiling.md @@ -1,14 +1,31 @@ # RFC: Integrated Stage Profiling -> **Status**: 📝 Draft (RFC - Not Yet Implemented) +> **Status**: ✅ Implemented (issue #58) > **Target**: `aimdb-core` (feature-flagged) -> **Priority**: Medium -> **Estimated Effort**: 2-3 days > **Feature Flag**: `profiling` > **Date**: November 2025 --- +## Implementation notes (vs. this RFC) + +- Timing is **wrapped around `Producer::produce()` and the buffer reader returned by + `Consumer::subscribe()`**, not around per-value handler closures (the actual + `.source()`/`.tap()` API takes whole-task closures that loop internally). A + source's "iteration time" is the interval between `produce()` calls; a tap's + "invocation time" is the interval from a `recv()` yielding a value to the next + `recv()` call. +- Timing uses the **runtime clock** (`aimdb_executor::TimeOps`, extended with + `duration_as_nanos`), not `std::time::Instant` — so the feature works on `no_std` + / Embassy too. `StageMetrics` uses `portable_atomic::AtomicU64` (the + `profiling` feature pulls `portable-atomic/critical-section` for the 64-bit-atomic + fallback on targets that need it). +- `RecordRegistrar::with_name("...")` is always available (no-op without the + feature). MCP tools: `get_stage_profiling` (incl. bottleneck detection) and + `reset_stage_profiling`. + +--- + ## Summary Add automatic timing instrumentation to AimDB's execution primitives (`.source()`, `.tap()`, `.link()`, and future `.transform()`), enabling users to identify slow stages without manual instrumentation. diff --git a/examples/remote-access-demo/Cargo.toml b/examples/remote-access-demo/Cargo.toml index 180254db..f6382d33 100644 --- a/examples/remote-access-demo/Cargo.toml +++ b/examples/remote-access-demo/Cargo.toml @@ -13,10 +13,11 @@ name = "client" path = "src/client.rs" [dependencies] -aimdb-core = { path = "../../aimdb-core", features = ["std", "tracing"] } +aimdb-core = { path = "../../aimdb-core", features = ["std", "tracing", "profiling"] } aimdb-tokio-adapter = { path = "../../aimdb-tokio-adapter", features = [ "tokio-runtime", "tracing", + "profiling", ] } tokio = { version = "1.48", features = ["full"] } tracing = "0.1" diff --git a/examples/remote-access-demo/README.md b/examples/remote-access-demo/README.md index 80cfc2c8..df53fcb5 100644 --- a/examples/remote-access-demo/README.md +++ b/examples/remote-access-demo/README.md @@ -5,10 +5,10 @@ This example demonstrates the AimX v1 remote access protocol with `record.list` ## What It Does **Server** (`server.rs`): -- Creates an AimDB instance with 4 record types (Temperature, SystemStatus, UserEvent, Config) +- Creates an AimDB instance with 5 record types (Temperature, SystemStatus, UserEvent, Config, AppSettings) - Enables remote access on Unix domain socket `/tmp/aimdb-demo.sock` -- Uses ReadOnly security policy -- Populates some initial data +- Uses ReadWrite security policy (`server::AppSettings` is the only writable key) +- Drives `Temperature` and `SystemStatus` from in-AimDB `.source()` tasks with named `.tap()` consumers, so the `profiling` feature can time every stage automatically (see [Stage Profiling](#stage-profiling) below) **Client** (`client.rs`): - Connects to the server via Unix domain socket @@ -87,6 +87,35 @@ echo '{"version":"1.0","client":"test"}' | socat - UNIX-CONNECT:/tmp/aimdb-demo. - Write permissions - Timestamps +## Stage Profiling + +The server is built with the `profiling` feature (see [Cargo.toml](Cargo.toml)) and +registers both `Temperature` and `SystemStatus` via `.source()` + `.tap()` so AimDB +owns the producer/consumer tasks and can time them automatically. Each stage is +named via `.with_name("...")`: + +| Record | Source stage | Tap stage | +|----------------|----------------------|----------------------------------------------| +| Temperature | `temp_simulator` | `temp_logger` (fast) | +| SystemStatus | `status_simulator` | `slow_status_processor` (sleeps 100 ms each) | + +Once the server has been running for a few seconds, query stage profiling via the +`aimdb-mcp` server using the `get_stage_profiling` tool with `record_key="SystemStatus"`. +The result is a list of `{call_count, avg_time_ns, min_time_ns, max_time_ns, name, ...}` +entries plus a `bottleneck` field — for SystemStatus the bottleneck will point at +`slow_status_processor` with an average around 100 ms. Reset the counters between +windows with the `reset_stage_profiling` tool. + +You can also call the raw RPC method directly: + +```bash +# reset the counters (requires write permission, ReadWrite policy already enabled) +echo '{"id":1,"method":"profiling.reset"}' | socat - UNIX-CONNECT:/tmp/aimdb-demo.sock +``` + +The per-stage snapshot is also embedded in each record's metadata via the +`stage_profiling` field of `record.list`. + ## Next Steps Future enhancements will add: diff --git a/examples/remote-access-demo/src/server.rs b/examples/remote-access-demo/src/server.rs index 254de9de..abec1cad 100644 --- a/examples/remote-access-demo/src/server.rs +++ b/examples/remote-access-demo/src/server.rs @@ -13,7 +13,7 @@ //! ``` use aimdb_core::remote::{AimxConfig, SecurityPolicy}; -use aimdb_core::{buffer::BufferCfg, AimDbBuilder}; +use aimdb_core::{buffer::BufferCfg, AimDbBuilder, Consumer, Producer, RuntimeContext}; use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt}; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -96,15 +96,31 @@ async fn main() -> Result<(), Box> { .with_remote_access(remote_config); // Configure records - // Use SpmcRing for Temperature and SystemStatus to support record.drain history + // Use SpmcRing for Temperature and SystemStatus to support record.drain history. + // + // Temperature and SystemStatus use `.source()` + `.tap()` so AimDB owns the + // producer/consumer task lifecycle — this also makes them eligible for + // automatic stage profiling (feature `profiling`, query via the MCP + // `get_stage_profiling` tool). `.with_name("...")` gives each stage a + // human-readable label that shows up in the profiling output. builder.configure::("server::Temperature", |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 100 }) - .with_remote_access(); + .with_remote_access() + .source(temperature_simulator) + .with_name("temp_simulator") + .tap(temperature_logger) + .with_name("temp_logger"); }); builder.configure::("server::SystemStatus", |reg| { reg.buffer(BufferCfg::SpmcRing { capacity: 50 }) - .with_remote_access(); + .with_remote_access() + .source(system_status_simulator) + .with_name("status_simulator") + // Deliberately slow consumer — surfaces as the bottleneck in + // `get_stage_profiling` for the SystemStatus record. + .tap(slow_status_processor) + .with_name("slow_status_processor"); }); builder.configure::("server::UserEvent", |reg| { @@ -131,31 +147,6 @@ async fn main() -> Result<(), Box> { info!("📝 Populating initial record data..."); - // Produce some initial data - let temp_producer = db.producer::("server::Temperature"); - let initial_duration = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap(); - let initial_timestamp = initial_duration.as_secs() as f64 - + initial_duration.subsec_nanos() as f64 / 1_000_000_000.0; - - temp_producer - .produce(Temperature { - sensor_id: "sensor-01".to_string(), - celsius: 22.5, - timestamp: initial_timestamp, - }) - .await?; - - let status_producer = db.producer::("server::SystemStatus"); - status_producer - .produce(SystemStatus { - uptime_seconds: 3600, - cpu_usage: 15.3, - memory_usage: 42.7, - }) - .await?; - let config_producer = db.producer::("server::Config"); config_producer .produce(Config { @@ -177,68 +168,8 @@ async fn main() -> Result<(), Box> { )?; info!("✅ Initial data populated"); - - // Spawn background task to continuously update Temperature - info!("🌡️ Starting live temperature simulator..."); - let temp_producer_clone = temp_producer.clone(); - tokio::spawn(async move { - let mut counter = 0u64; - loop { - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - - counter += 1; - let temp = 20.0 + (counter as f64 * 0.5) + (counter as f64 % 10.0); - - let duration = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap(); - let timestamp = - duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; - - let reading = Temperature { - sensor_id: format!("sensor-{:02}", (counter % 3) + 1), - celsius: temp, - timestamp, - }; - - if let Err(e) = temp_producer_clone.produce(reading.clone()).await { - tracing::error!("Failed to produce temperature: {}", e); - } else { - tracing::debug!( - "📊 Produced temperature: {} °C from {}", - reading.celsius, - reading.sensor_id - ); - } - } - }); - - // Spawn background task to update SystemStatus - info!("💻 Starting system status simulator..."); - let status_producer_clone = status_producer.clone(); - tokio::spawn(async move { - let mut uptime = 3600u64; - loop { - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - - uptime += 5; - let status = SystemStatus { - uptime_seconds: uptime, - cpu_usage: 10.0 + (uptime as f64 % 30.0), - memory_usage: 40.0 + ((uptime as f64 / 10.0) % 20.0), - }; - - if let Err(e) = status_producer_clone.produce(status.clone()).await { - tracing::error!("Failed to produce system status: {}", e); - } else { - tracing::debug!( - "📊 Produced system status: CPU {:.1}%, MEM {:.1}%", - status.cpu_usage, - status.memory_usage - ); - } - } - }); + info!("🌡️ Temperature simulator running via .source() (every 2s)"); + info!("💻 SystemStatus simulator running via .source() (every 5s)"); info!(""); info!("🎯 Server ready! Connect with:"); @@ -254,6 +185,10 @@ async fn main() -> Result<(), Box> { info!(" Temperature: every 2 seconds"); info!(" SystemStatus: every 5 seconds"); info!(""); + info!("📈 Stage profiling is enabled. Query it via the aimdb-mcp tools:"); + info!(" get_stage_profiling record_key=\"Temperature\" → per-stage timing"); + info!(" reset_stage_profiling → clear counters"); + info!(""); info!("Press Ctrl+C to stop the server"); // Keep server running @@ -263,3 +198,104 @@ async fn main() -> Result<(), Box> { Ok(()) } + +// ============================================================================ +// SOURCES & TAPS — owned by AimDB so the stage-profiling feature can time them +// ============================================================================ + +/// Periodically produces Temperature readings. +/// +/// Because this runs inside a `.source()` callback, every `produce()` call is +/// timed by the `profiling` feature — see `get_stage_profiling`. +async fn temperature_simulator( + ctx: RuntimeContext, + temperature: Producer, +) { + let time = ctx.time(); + let mut counter = 0u64; + loop { + let celsius = 20.0 + (counter as f64 * 0.5) + (counter as f64 % 10.0); + let duration = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap(); + let timestamp = + duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; + + let reading = Temperature { + sensor_id: format!("sensor-{:02}", (counter % 3) + 1), + celsius, + timestamp, + }; + + if let Err(e) = temperature.produce(reading).await { + tracing::error!("Failed to produce temperature: {}", e); + } + + counter += 1; + time.sleep(time.secs(2)).await; + } +} + +/// Fast tap on Temperature — just logs. Stage profiling will show it as +/// substantially faster than `slow_status_processor` on SystemStatus. +async fn temperature_logger( + _ctx: RuntimeContext, + consumer: Consumer, +) { + let Ok(mut reader) = consumer.subscribe() else { + tracing::error!("Failed to subscribe to Temperature"); + return; + }; + while let Ok(reading) = reader.recv().await { + tracing::debug!( + "🌡️ Logged temperature: {:.1} °C from {}", + reading.celsius, + reading.sensor_id + ); + } +} + +/// Periodically produces SystemStatus readings. +async fn system_status_simulator( + ctx: RuntimeContext, + status: Producer, +) { + let time = ctx.time(); + let mut uptime = 3600u64; + loop { + let snapshot = SystemStatus { + uptime_seconds: uptime, + cpu_usage: 10.0 + (uptime as f64 % 30.0), + memory_usage: 40.0 + ((uptime as f64 / 10.0) % 20.0), + }; + + if let Err(e) = status.produce(snapshot).await { + tracing::error!("Failed to produce system status: {}", e); + } + + uptime += 5; + time.sleep(time.secs(5)).await; + } +} + +/// Intentionally slow tap on SystemStatus — sleeps 100ms per value to simulate +/// expensive per-value processing. With profiling enabled, this stage shows up +/// as the per-record bottleneck in `get_stage_profiling`. +async fn slow_status_processor( + ctx: RuntimeContext, + consumer: Consumer, +) { + let time = ctx.time(); + let Ok(mut reader) = consumer.subscribe() else { + tracing::error!("Failed to subscribe to SystemStatus"); + return; + }; + while let Ok(snapshot) = reader.recv().await { + time.sleep(time.millis(100)).await; + tracing::debug!( + "💻 Processed status: CPU {:.1}%, MEM {:.1}%", + snapshot.cpu_usage, + snapshot.memory_usage + ); + } +} diff --git a/tools/aimdb-mcp/CHANGELOG.md b/tools/aimdb-mcp/CHANGELOG.md index 8e99333e..85e00168 100644 --- a/tools/aimdb-mcp/CHANGELOG.md +++ b/tools/aimdb-mcp/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Stage profiling tools** (Issue #58): Two new MCP tools surfaced from the `aimdb-core` `profiling` feature (enabled here on the `aimdb-core` dependency so the server always understands the `stage_profiling` metadata field). + - `get_stage_profiling` — per-stage wall-clock timing for `.source()` / `.tap()` / `.link()` callbacks of records matching a `record_key` substring. Returns per-stage `call_count` / `avg_time_ns` / `min_time_ns` / `max_time_ns` plus a `bottleneck` pointing at the stage with the highest average, with a human-readable `recommendation` string. Unnamed stages appear as `source[0]`, `tap[0]`, etc.; names set via `.with_name("...")` on the registrar surface as the `name` field. + - `reset_stage_profiling` — clears profiling counters for every record on the target instance. Issues the `profiling.reset` AimX request via `aimdb-client`; requires write permission. Falls back to a friendly `{ "reset": false, "message": ... }` response when the target was built without the `profiling` feature. - **Public mode** (`--public` flag): Restricts the server to read-only tools (`discover_instances`, `list_records`, `get_record`) for safe internet-facing deployments. Suppresses resources and prompts capabilities. Strips client-supplied `socket_path` arguments to prevent SSRF. - **Default socket flag** (`--socket `): Sets a default socket path at startup, removing the need for clients to pass `socket_path` on every call. Resolution order: explicit arg → `--socket` → `AIMDB_SOCKET` env var. - **CLI argument parsing**: Added `clap` dependency for structured CLI flags. diff --git a/tools/aimdb-mcp/Cargo.toml b/tools/aimdb-mcp/Cargo.toml index 9c2ef6ed..a693b319 100644 --- a/tools/aimdb-mcp/Cargo.toml +++ b/tools/aimdb-mcp/Cargo.toml @@ -30,6 +30,7 @@ aimdb-client = { version = "0.5.0", path = "../../aimdb-client" } aimdb-codegen = { version = "0.1.0", path = "../../aimdb-codegen" } aimdb-core = { version = "1.0.0", path = "../../aimdb-core", features = [ "std", + "profiling", ] } # Async runtime diff --git a/tools/aimdb-mcp/README.md b/tools/aimdb-mcp/README.md index 8ad97410..f422386a 100644 --- a/tools/aimdb-mcp/README.md +++ b/tools/aimdb-mcp/README.md @@ -214,6 +214,34 @@ Query: "Where is subscription data saved?" Result: Path to notification directory ``` +### 11. get_stage_profiling + +Show automatic per-stage timing — how long each `.source()` / `.tap()` / `.link()` +callback takes (wall-clock, including `.await` / I/O / sleeps) — for records +matching a key, and flag the slowest stage. Requires the target instance to be +built with the `profiling` feature; otherwise records carry no profiling data. + +``` +Query: "Which stage of my Temperature pipeline is the bottleneck?" + +Result: Per-stage call_count / avg / min / max (ns) plus a "bottleneck" pointing + at the stage with the highest average time, with a recommendation string. +``` + +Stage names come from `.with_name("...")` on the registrar; unnamed stages show as +`source[0]`, `tap[0]`, etc. + +### 12. reset_stage_profiling + +Reset stage profiling counters for every record on the target instance (requires +write permission and the `profiling` feature). Useful for windowed measurements. + +``` +Query: "Reset the stage profiling counters." + +Result: { "reset": true } +``` + ## Schema Inference The MCP server can infer JSON schemas from record values: diff --git a/tools/aimdb-mcp/src/server.rs b/tools/aimdb-mcp/src/server.rs index 4d9b7483..f36499bd 100644 --- a/tools/aimdb-mcp/src/server.rs +++ b/tools/aimdb-mcp/src/server.rs @@ -753,6 +753,39 @@ impl McpServer { "additionalProperties": false }), }, + Tool { + name: "get_stage_profiling".to_string(), + description: "Get automatic stage profiling (per-`.source()`/`.tap()`/`.link()` callback wall-clock timing) for records matching a key from a running AimDB instance, including the slowest stage ('bottleneck'). Requires the instance to be built with the `profiling` feature.".to_string(), + input_schema: json!({ + "type": "object", + "properties": { + "socket_path": { + "type": "string", + "description": "Unix socket path to the AimDB instance. Falls back to AIMDB_SOCKET env var if omitted." + }, + "record_key": { + "type": "string", + "description": "Substring to match against record names/keys (e.g., 'Temperature')" + } + }, + "required": ["record_key"], + "additionalProperties": false + }), + }, + Tool { + name: "reset_stage_profiling".to_string(), + description: "Reset stage profiling counters for every record on a running AimDB instance (requires write permission and the `profiling` feature).".to_string(), + input_schema: json!({ + "type": "object", + "properties": { + "socket_path": { + "type": "string", + "description": "Unix socket path to the AimDB instance. Falls back to AIMDB_SOCKET env var if omitted." + } + }, + "additionalProperties": false + }), + }, Tool { name: "save_memory".to_string(), description: "Persist ideation context and design rationale to .aimdb/memory.md. \ @@ -868,6 +901,8 @@ impl McpServer { tools::validate_against_instance(params.arguments).await? } "get_buffer_metrics" => tools::get_buffer_metrics(params.arguments).await?, + "get_stage_profiling" => tools::get_stage_profiling(params.arguments).await?, + "reset_stage_profiling" => tools::reset_stage_profiling(params.arguments).await?, "save_memory" => tools::save_memory(params.arguments).await?, "reset_session" => tools::reset_session(params.arguments).await?, _ => { diff --git a/tools/aimdb-mcp/src/tools/mod.rs b/tools/aimdb-mcp/src/tools/mod.rs index 7a8eecbb..3a985664 100644 --- a/tools/aimdb-mcp/src/tools/mod.rs +++ b/tools/aimdb-mcp/src/tools/mod.rs @@ -8,6 +8,7 @@ use once_cell::sync::OnceCell; pub mod architecture; pub mod graph; pub mod instance; +pub mod profiling; pub mod record; pub mod schema; @@ -57,5 +58,6 @@ pub use architecture::{ }; pub use graph::{graph_edges, graph_nodes, graph_topo_order}; pub use instance::{discover_instances, get_instance_info}; +pub use profiling::{get_stage_profiling, reset_stage_profiling}; pub use record::{drain_record, get_record, list_records, set_record}; pub use schema::query_schema; diff --git a/tools/aimdb-mcp/src/tools/profiling.rs b/tools/aimdb-mcp/src/tools/profiling.rs new file mode 100644 index 00000000..1e91ba20 --- /dev/null +++ b/tools/aimdb-mcp/src/tools/profiling.rs @@ -0,0 +1,130 @@ +//! Stage profiling tools. +//! +//! `get_stage_profiling` reports per-stage timing (`.source()`/`.tap()`/`.link()` +//! callback wall-clock time) for records matching a key, and flags the slowest +//! stage. `reset_stage_profiling` clears the counters on the server. +//! +//! These require the target AimDB instance to be built with the `profiling` +//! feature; without it, records simply carry no `stage_profiling` data. + +use crate::error::{McpError, McpResult}; +use aimdb_client::AimxClient; +use serde::Deserialize; +use serde_json::{json, Value}; +use tracing::debug; + +#[derive(Debug, Deserialize)] +struct GetStageProfilingParams { + socket_path: Option, + /// Substring matched against record name/key (e.g. `"Temperature"`). + record_key: String, +} + +#[derive(Debug, Deserialize)] +struct ResetStageProfilingParams { + socket_path: Option, +} + +async fn connect(socket_path: &str) -> McpResult { + if let Some(pool) = super::connection_pool() { + pool.get_connection(socket_path) + .await + .map_err(McpError::Client) + } else { + AimxClient::connect(socket_path) + .await + .map_err(McpError::Client) + } +} + +fn ms(ns: u64) -> f64 { + ns as f64 / 1_000_000.0 +} + +/// Returns per-stage profiling metrics for records matching `record_key`, plus a +/// `bottleneck` (the stage with the highest average wall-clock time). +pub async fn get_stage_profiling(args: Option) -> McpResult { + debug!("get_stage_profiling called"); + let params: GetStageProfilingParams = serde_json::from_value(args.unwrap_or(Value::Null)) + .map_err(|e| McpError::InvalidParams(format!("get_stage_profiling: {e}")))?; + let socket_path = super::resolve_socket_path(params.socket_path)?; + + let mut client = connect(&socket_path).await?; + let records = client.list_records().await.map_err(McpError::Client)?; + + let mut out = Vec::new(); + for rec in records { + if !(rec.name.contains(¶ms.record_key) || rec.record_key.contains(¶ms.record_key)) { + continue; + } + let stages = rec.stage_profiling.clone().unwrap_or_default(); + + // Bottleneck = stage with the highest avg_time_ns among those actually invoked. + let bottleneck = stages + .iter() + .filter(|s| s.call_count > 0) + .max_by_key(|s| s.avg_time_ns) + .map(|s| { + let label = s.name.clone().unwrap_or_else(|| format!("{}[{}]", s.stage_type, s.index)); + json!({ + "stage_type": s.stage_type, + "index": s.index, + "name": s.name, + "avg_time_ns": s.avg_time_ns, + "call_count": s.call_count, + "recommendation": format!( + "{} '{}' averages {:.2} ms per call ({} calls) — this is the slowest stage.", + s.stage_type, label, ms(s.avg_time_ns), s.call_count + ), + }) + }); + + out.push(json!({ + "record": rec.name, + "key": rec.record_key, + "profiling_enabled": rec.stage_profiling.is_some(), + "stages": serde_json::to_value(&stages)?, + "bottleneck": bottleneck, + })); + } + + if out.is_empty() { + return Ok(json!({ + "found": false, + "record_key": params.record_key, + "message": "No records matching this key were found.", + })); + } + + Ok(json!({ + "found": true, + "record_key": params.record_key, + "records": out, + })) +} + +/// Resets stage profiling counters for every record on the target instance. +pub async fn reset_stage_profiling(args: Option) -> McpResult { + debug!("reset_stage_profiling called"); + let params: ResetStageProfilingParams = serde_json::from_value(args.unwrap_or(Value::Null)) + .map_err(|e| McpError::InvalidParams(format!("reset_stage_profiling: {e}")))?; + let socket_path = super::resolve_socket_path(params.socket_path)?; + + let mut client = connect(&socket_path).await?; + match client.reset_stage_profiling().await { + Ok(_) => Ok(json!({ + "reset": true, + "message": "Stage profiling counters reset on all records.", + })), + Err(aimdb_client::ClientError::ServerError { ref code, .. }) + if code == "method_not_found" => + { + // The server was built without the `profiling` feature. + Ok(json!({ + "reset": false, + "message": "The target instance does not support profiling.reset (built without the `profiling` feature?).", + })) + } + Err(e) => Err(McpError::Client(e)), + } +}