Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .vscode/mcp.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@
"aimdb-weather": {
"type": "http",
"url": "http://aimdb.dev/mcp"
},
"aimdb-local": {
"type": "stdio",
"command": "cargo",
"args": [
"run",
"--manifest-path",
"${workspaceFolder}/tools/aimdb-mcp/Cargo.toml",
"--",
"--socket",
"/tmp/aimdb-demo.sock"
]
}
}
}
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- **Automatic stage profiling (Issue #58, RFC 014)**: New `profiling` feature on `aimdb-core` automatically measures wall-clock time per `.source()` / `.tap()` / `.link()` stage and exposes `call_count` / `total` / `avg` / `min` / `max` counters per stage. Name stages with `RecordRegistrar::with_name("...")`. Works on `no_std + alloc` via the runtime clock (`aimdb_executor::TimeOps`) and `portable-atomic` for 64-bit atomics on `thumbv7em`. New MCP tools `get_stage_profiling` (with bottleneck detection + recommendation) and `reset_stage_profiling`. Feature is off by default and zero-cost when disabled. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-executor](aimdb-executor/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-wasm-adapter](aimdb-wasm-adapter/CHANGELOG.md), [aimdb-client](aimdb-client/CHANGELOG.md), [tools/aimdb-mcp](tools/aimdb-mcp/CHANGELOG.md))
- **`remote-access-demo` exercises stage profiling**: The `Temperature` and `SystemStatus` records now run from in-AimDB `.source()` + `.tap()` tasks (with `.with_name(...)` stage labels) and the demo enables the `profiling` feature. `SystemStatus.slow_status_processor` deliberately sleeps 100 ms per value so `get_stage_profiling` flags it as the bottleneck. README documents how to query and reset profiling via MCP / `socat`.
- **`hello-mailbox` example (Issue #94)**: New minimal example demonstrating the Mailbox buffer (latest-wins) semantics using the synchronous API. First community contribution from [@ggmaldo](https://github.com/ggmaldo) — see [examples/hello-mailbox/](examples/hello-mailbox/).
- **Writer-exclusivity validation (Issue #89)**: Combining `.source()`, `.transform()`, and `.link_from()` on the same record now panics at configuration time with a clear message instead of silently producing a last-writer-wins race on the buffer. Multiple `.link_from()` inbound connectors (fan-in) remain allowed. ([aimdb-core](aimdb-core/CHANGELOG.md))
- **`no_std` Transform API (Design 027)**: `.transform()` and `.transform_join()` are now available on `no_std + alloc` targets — no longer Tokio-only. Multi-input join fan-in moved out of `aimdb-core` into the new `JoinFanInRuntime` traits in `aimdb-executor`, with implementations in the Tokio (`mpsc::channel`, capacity 64), Embassy (`embassy_sync::Channel`, capacity 8), and WASM (`futures_channel::mpsc`, capacity 64) adapters. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-executor](aimdb-executor/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-wasm-adapter](aimdb-wasm-adapter/CHANGELOG.md))
Expand All @@ -51,6 +53,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- **aimdb-executor**: `TimeOps` trait gained a required `duration_as_nanos(duration) -> u64` method (runtime-agnostic numeric representation of elapsed time, used by stage profiling). Implemented in tokio, embassy, and wasm adapters. ([aimdb-executor](aimdb-executor/CHANGELOG.md))
- **aimdb-embassy-adapter**: `SpmcRing` subscriber-slot exhaustion now emits a `defmt::error!` with guidance to increase the `CONSUMERS` const generic. Counting rule: one slot per `.tap()`, `.link_to()`, and `transform_join` input.
- **aimdb-codegen**: Generated join handler stubs updated to the new `on_triggers` task model (`async fn task_handler(JoinEventRx, Producer<...>)`).
- **aimdb-core**: Breaking API changes to `InboundConnectorLink`, `Router`, and `RouterBuilder` to support `DeserializerKind` (see [aimdb-core/CHANGELOG.md](aimdb-core/CHANGELOG.md))
Expand Down
7 changes: 6 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,14 @@ build:
cargo build --package aimdb-core --no-default-features --features alloc
@printf "$(YELLOW) → Building aimdb-core (std platform)$(NC)\n"
cargo build --package aimdb-core --features "std,tracing,metrics"
@printf "$(YELLOW) → Building aimdb-core (no_std + alloc + profiling)$(NC)\n"
cargo build --package aimdb-core --no-default-features --features "alloc,profiling"
@printf "$(YELLOW) → Building aimdb-core (std + profiling)$(NC)\n"
cargo build --package aimdb-core --features "std,tracing,profiling"
@printf "$(YELLOW) → Building tokio adapter$(NC)\n"
cargo build --package aimdb-tokio-adapter --features "tokio-runtime,tracing,metrics"
@printf "$(YELLOW) → Building tokio adapter (with profiling)$(NC)\n"
cargo build --package aimdb-tokio-adapter --features "tokio-runtime,tracing,profiling"
@printf "$(YELLOW) → Building sync wrapper$(NC)\n"
cargo build --package aimdb-sync
@printf "$(YELLOW) → Building codegen library$(NC)\n"
Expand Down Expand Up @@ -90,12 +96,18 @@ test:
cargo test --package aimdb-core --features "std,tracing"
@printf "$(YELLOW) → Testing aimdb-core (std + metrics)$(NC)\n"
cargo test --package aimdb-core --features "std,tracing,metrics"
@printf "$(YELLOW) → Testing aimdb-core (std + profiling)$(NC)\n"
cargo test --package aimdb-core --features "std,tracing,profiling"
@printf "$(YELLOW) → Testing aimdb-core (no_std + alloc + profiling)$(NC)\n"
cargo test --package aimdb-core --no-default-features --features "alloc,profiling"
@printf "$(YELLOW) → Testing aimdb-core remote module$(NC)\n"
cargo test --package aimdb-core --lib --features "std" remote::
@printf "$(YELLOW) → Testing tokio adapter$(NC)\n"
cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing"
@printf "$(YELLOW) → Testing tokio adapter (with metrics)$(NC)\n"
cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing,metrics"
@printf "$(YELLOW) → Testing tokio adapter (with profiling)$(NC)\n"
cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing,profiling"
@printf "$(YELLOW) → Testing sync wrapper$(NC)\n"
cargo test --package aimdb-sync
@printf "$(YELLOW) → Testing codegen library$(NC)\n"
Expand Down Expand Up @@ -243,6 +255,8 @@ test-embedded:
cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime"
@printf "$(YELLOW) → Checking aimdb-embassy-adapter with network support on thumbv7em-none-eabihf target$(NC)\n"
cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime,embassy-net-support"
@printf "$(YELLOW) → Checking aimdb-embassy-adapter with profiling on thumbv7em-none-eabihf target$(NC)\n"
cargo check --package aimdb-embassy-adapter --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime,profiling"
@printf "$(YELLOW) → Checking aimdb-mqtt-connector (Embassy) on thumbv7em-none-eabihf target$(NC)\n"
cargo check --package aimdb-mqtt-connector --target thumbv7em-none-eabihf --no-default-features --features "embassy-runtime"
@printf "$(YELLOW) → Checking aimdb-mqtt-connector (Embassy + defmt) on thumbv7em-none-eabihf target$(NC)\n"
Expand Down
2 changes: 1 addition & 1 deletion _external/embassy
Submodule embassy updated 136 files
4 changes: 3 additions & 1 deletion aimdb-client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

No changes yet.
### Added

- **`AimxClient::reset_stage_profiling()`** (Issue #58): New method issuing the `profiling.reset` AimX request to clear stage profiling counters for every record on the server. Requires the server to be built with the `profiling` feature and the connection to have write permission.

## [0.5.0] - 2026-02-21

Expand Down
8 changes: 8 additions & 0 deletions aimdb-client/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ impl AimxClient {
Ok(records)
}

/// Reset stage profiling counters for every record on the server.
///
/// Requires the server to be built with the `profiling` feature and the
/// connection to have write permission.
pub async fn reset_stage_profiling(&mut self) -> ClientResult<serde_json::Value> {
self.send_request("profiling.reset", None).await
}

/// Get current value of a record
pub async fn get_record(&mut self, name: &str) -> ClientResult<serde_json::Value> {
let params = json!({ "record": name });
Expand Down
10 changes: 10 additions & 0 deletions aimdb-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- **Automatic stage profiling (Issue #58, RFC 014, feature `profiling`)**: AimDB now measures wall-clock time per `.source()`, `.tap()`, and `.link()` stage with no user instrumentation. Feature is off by default and adds zero overhead when disabled; `alloc` + a runtime clock is enough, so it works on `no_std + alloc` targets too.
- New `profiling` module exporting `StageMetrics` (atomic `call_count` / `total_time_ns` / `avg_time_ns` / `min_time_ns` / `max_time_ns` counters), `RecordProfilingMetrics` per-record container, and serializable `StageProfilingInfo` snapshot.
- Source-stage timing measures the interval between successive `Producer::produce()` calls via a new `ProducerProfilingState`. Tap- and link-stage timing wraps the `BufferReader` returned by `Consumer::subscribe()` in a new `ProfilingBufferReader` that times the interval between successive `recv()` yields. The whole-task closure shape of `.source()` / `.tap()` is preserved — no per-value handler changes.
- `RecordRegistrar::with_name("...")` assigns a human-readable name to the most recently registered source/tap/link; surfaces in MCP output. Always callable — a no-op when the feature is disabled.
- New `StageKind` enum (`Source` / `Tap` / `Link` / `Transform`); `.transform()` is stubbed for future instrumentation.
- `RecordMetadata` gains an optional `stage_profiling: Vec<StageProfilingInfo>` field (feature-gated) attached automatically in `TypedRecord::collect_metadata`. New helper `RecordMetadata::with_stage_profiling`.
- `AimDb::reset_stage_profiling()` clears every record's counters. New `profiling.reset` AimX RPC method (write-permission gated) wired through `remote::handler`.
- New `RuntimeForProfiling` marker trait — blanket-implemented for every `R` when the feature is off, requires `aimdb_executor::TimeOps` when on. Surfaces on `AimDbBuilder::run` / `build` and `AimDb::build_with`. Public API is unchanged when the feature is disabled.
- New `Time::duration_as_nanos` accessor on the context (delegates to `TimeOps`).
- Dependency: `portable-atomic` (with the `fallback` + `critical-section` features enabled by the `profiling` feature) for 64-bit-atomic emulation on targets without native `AtomicU64` (e.g. `thumbv7em-none-eabihf`).
- **Writer-exclusivity validation for `.link_from()` (Issue #89)**: `.source()`, `.transform()`, and `.link_from()` are now mutually exclusive on a single record — combining any two now panics at configuration time instead of silently racing on the buffer (last-writer-wins). The check fires from `LinkFromBuilder::finish()` (panic message includes the offending URL), with symmetric defense-in-depth checks added to `TypedRecord::set_producer_service`, `set_transform`, and `add_inbound_connector`. Multiple `.link_from()` calls on the same record (fan-in) remain permitted.
- **`no_std` support for the full Transform API (Design 027)**: `.transform()` and `.transform_join()` are now available on `no_std + alloc` targets. Multi-input join fan-in is no longer hardcoded to `tokio::sync::mpsc`; it uses the runtime-agnostic `JoinFanInRuntime` traits from `aimdb-executor`, implemented by Tokio, Embassy, and WASM adapters.
- **`JoinEventRx`** — type-erased trigger receiver passed to the `on_triggers` handler. Call `.recv().await` in a loop to consume `JoinTrigger` events from all input forwarders.
Expand Down
7 changes: 7 additions & 0 deletions aimdb-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ tracing = ["dep:tracing"] # Works in both std and no_std environments
defmt = ["dep:defmt"] # Embedded logging via probe (no_std)
metrics = ["dep:metrics", "std"] # Requires std for aggregation

# Automatic stage profiling (.source()/.tap()/.link() timing).
# Independent of `metrics`; works in no_std (only needs heap + a runtime clock).
# `portable-atomic/critical-section` provides the 64-bit-atomic fallback on targets
# without native 64-bit atomics (e.g. thumbv7em); it's a no-op where native atomics
# exist, and embedded binaries already supply a `critical-section` impl.
profiling = ["alloc", "portable-atomic/fallback", "portable-atomic/critical-section"]

# Testing features
test-utils = ["std"]

Expand Down
41 changes: 38 additions & 3 deletions aimdb-core/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ where
#[cfg(feature = "alloc")]
record_key: record_key.as_str().to_string(),
extensions: &self.extensions,
last_stage: None,
};
f(&mut reg);

Expand Down Expand Up @@ -675,7 +676,10 @@ where
/// .run().await // Runs forever
/// }
/// ```
pub async fn run(self) -> DbResult<()> {
pub async fn run(self) -> DbResult<()>
where
R: crate::RuntimeForProfiling,
{
#[cfg(feature = "tracing")]
tracing::info!("Building database and spawning background tasks...");

Expand Down Expand Up @@ -728,7 +732,10 @@ where
/// # Returns
/// `DbResult<AimDb<R>>` - The database instance
#[cfg_attr(not(feature = "std"), allow(unused_mut))]
pub async fn build(self) -> DbResult<AimDb<R>> {
pub async fn build(self) -> DbResult<AimDb<R>>
where
R: crate::RuntimeForProfiling,
{
use crate::DbError;

// Validate all records
Expand Down Expand Up @@ -836,9 +843,14 @@ where
extensions: self.extensions,
});

#[cfg(feature = "profiling")]
let profiling_clock = crate::profiling::make_clock(runtime.clone());

let db = Arc::new(AimDb {
inner: inner.clone(),
runtime: runtime.clone(),
#[cfg(feature = "profiling")]
profiling_clock,
});

#[cfg(feature = "tracing")]
Expand Down Expand Up @@ -1016,13 +1028,19 @@ pub struct AimDb<R: aimdb_executor::Spawn + 'static> {

/// Runtime adapter with concrete type
runtime: Arc<R>,

/// Shared wall clock for stage profiling, built from the runtime at `build()` time.
#[cfg(feature = "profiling")]
profiling_clock: crate::profiling::Clock,
}

impl<R: aimdb_executor::Spawn + 'static> Clone for AimDb<R> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
runtime: self.runtime.clone(),
#[cfg(feature = "profiling")]
profiling_clock: self.profiling_clock.clone(),
}
}
}
Expand Down Expand Up @@ -1050,8 +1068,17 @@ impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
&self.inner.extensions
}

/// Shared wall clock used by stage profiling (nanoseconds since an arbitrary epoch).
#[cfg(feature = "profiling")]
pub(crate) fn profiling_clock(&self) -> &crate::profiling::Clock {
&self.profiling_clock
}

/// Builds a database with a closure-based builder pattern
pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()> {
pub async fn build_with(rt: Arc<R>, f: impl FnOnce(&mut AimDbBuilder<R>)) -> DbResult<()>
where
R: crate::RuntimeForProfiling,
{
let mut b = AimDbBuilder::new().runtime(rt);
f(&mut b);
b.run().await
Expand Down Expand Up @@ -1238,6 +1265,14 @@ impl<R: aimdb_executor::Spawn + 'static> AimDb<R> {
self.inner.list_records()
}

/// Resets stage profiling counters for every record (feature `profiling`).
#[cfg(feature = "profiling")]
pub fn reset_stage_profiling(&self) {
for record in &self.inner.storages {
record.reset_profiling();
}
}

/// Try to get record's latest value as JSON by name (std only)
///
/// Convenience wrapper around `AimDbInner::try_latest_as_json()`.
Expand Down
5 changes: 5 additions & 0 deletions aimdb-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ impl<'a, R: Runtime> Time<'a, R> {
pub fn duration_since(&self, later: R::Instant, earlier: R::Instant) -> Option<R::Duration> {
self.ctx.runtime.duration_since(later, earlier)
}

/// Number of whole nanoseconds in a duration (runtime-agnostic).
pub fn duration_as_nanos(&self, duration: R::Duration) -> u64 {
self.ctx.runtime.duration_as_nanos(duration)
}
}

/// Log utilities accessor for RuntimeContext
Expand Down
27 changes: 26 additions & 1 deletion aimdb-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ mod error;
pub mod ext_macros;
pub mod extensions;
pub mod graph;
#[cfg(feature = "profiling")]
pub mod profiling;
pub mod record_id;
#[cfg(feature = "std")]
pub mod remote;
Expand All @@ -38,6 +40,25 @@ pub mod transport;
pub mod typed_api;
pub mod typed_record;

/// Marker trait used to add a `TimeOps` requirement to runtime-agnostic builder
/// methods *only* when the `profiling` feature is enabled.
///
/// When `profiling` is off this is a blanket no-op (every `R` implements it), so
/// the public API is unchanged. When `profiling` is on it requires
/// [`aimdb_executor::TimeOps`], which every real runtime adapter already provides
/// (it is part of [`aimdb_executor::Runtime`]).
#[cfg(feature = "profiling")]
pub trait RuntimeForProfiling: aimdb_executor::TimeOps {}
#[cfg(feature = "profiling")]
impl<R: aimdb_executor::TimeOps> RuntimeForProfiling for R {}

/// See the `profiling`-enabled definition above. Blanket no-op when `profiling`
/// is disabled.
#[cfg(not(feature = "profiling"))]
pub trait RuntimeForProfiling {}
#[cfg(not(feature = "profiling"))]
impl<R> RuntimeForProfiling for R {}

// Public API exports
pub use context::RuntimeContext;
pub use error::{DbError, DbResult};
Expand All @@ -58,9 +79,13 @@ pub use builder::OutboundRoute;
pub use builder::{AimDb, AimDbBuilder};
pub use connector::ConnectorBuilder;
pub use transport::{Connector, ConnectorConfig, PublishError};
pub use typed_api::{Consumer, Producer, RecordRegistrar, RecordT};
pub use typed_api::{Consumer, Producer, RecordRegistrar, RecordT, StageKind};
pub use typed_record::{AnyRecord, AnyRecordExt, TypedRecord};

// Stage profiling exports (feature-gated)
#[cfg(feature = "profiling")]
pub use profiling::{RecordProfilingMetrics, StageMetrics, StageProfilingInfo};

// Connector Infrastructure exports
pub use connector::TopicResolverFn;
pub use connector::{ConnectorClient, ConnectorLink, ConnectorUrl, SerializeError};
Expand Down
Loading