diff --git a/Cargo.lock b/Cargo.lock index 66d2bd9..0981546 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1672,6 +1672,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "hello-single-latest-async" +version = "1.0.0" +dependencies = [ + "aimdb-core", + "aimdb-tokio-adapter", + "tokio", +] + [[package]] name = "http" version = "1.4.0" diff --git a/Cargo.toml b/Cargo.toml index eb21e37..7d0c7b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ members = [ "examples/weather-mesh-demo/weather-station-beta", "examples/weather-mesh-demo/weather-station-gamma", "examples/hello-mailbox", + "examples/hello-single-latest-async", ] exclude = ["_external"] resolver = "2" diff --git a/Makefile b/Makefile index da21a53..65811ba 100644 --- a/Makefile +++ b/Makefile @@ -135,7 +135,7 @@ test: fmt: @printf "$(GREEN)Formatting code (workspace members only)...$(NC)\n" - @for pkg in aimdb-executor aimdb-derive aimdb-data-contracts aimdb-core aimdb-client aimdb-embassy-adapter aimdb-tokio-adapter aimdb-wasm-adapter aimdb-sync aimdb-persistence aimdb-persistence-sqlite aimdb-mqtt-connector aimdb-knx-connector aimdb-ws-protocol aimdb-websocket-connector aimdb-codegen aimdb-cli aimdb-mcp sync-api-demo tokio-mqtt-connector-demo embassy-mqtt-connector-demo tokio-knx-connector-demo embassy-knx-connector-demo weather-mesh-common weather-hub weather-station-alpha weather-station-beta hello-mailbox; do \ + @for pkg in aimdb-executor aimdb-derive aimdb-data-contracts aimdb-core aimdb-client aimdb-embassy-adapter aimdb-tokio-adapter aimdb-wasm-adapter aimdb-sync aimdb-persistence aimdb-persistence-sqlite aimdb-mqtt-connector aimdb-knx-connector aimdb-ws-protocol aimdb-websocket-connector aimdb-codegen aimdb-cli aimdb-mcp sync-api-demo tokio-mqtt-connector-demo embassy-mqtt-connector-demo tokio-knx-connector-demo embassy-knx-connector-demo weather-mesh-common weather-hub weather-station-alpha weather-station-beta hello-mailbox hello-single-latest-async; do \ printf "$(YELLOW) → Formatting $$pkg$(NC)\n"; \ cargo fmt -p $$pkg 2>/dev/null || true; \ done @@ -144,7 +144,7 @@ fmt: fmt-check: @printf "$(GREEN)Checking code formatting (workspace members only)...$(NC)\n" @FAILED=0; \ - for pkg in aimdb-executor aimdb-derive aimdb-data-contracts aimdb-core aimdb-client aimdb-embassy-adapter aimdb-tokio-adapter aimdb-wasm-adapter aimdb-sync aimdb-persistence aimdb-persistence-sqlite aimdb-mqtt-connector aimdb-knx-connector aimdb-ws-protocol aimdb-websocket-connector aimdb-codegen aimdb-cli aimdb-mcp sync-api-demo tokio-mqtt-connector-demo embassy-mqtt-connector-demo tokio-knx-connector-demo embassy-knx-connector-demo weather-mesh-common weather-hub weather-station-alpha weather-station-beta hello-mailbox; do \ + for pkg in aimdb-executor aimdb-derive aimdb-data-contracts aimdb-core aimdb-client aimdb-embassy-adapter aimdb-tokio-adapter aimdb-wasm-adapter aimdb-sync aimdb-persistence aimdb-persistence-sqlite aimdb-mqtt-connector aimdb-knx-connector aimdb-ws-protocol aimdb-websocket-connector aimdb-codegen aimdb-cli aimdb-mcp sync-api-demo tokio-mqtt-connector-demo embassy-mqtt-connector-demo tokio-knx-connector-demo embassy-knx-connector-demo weather-mesh-common weather-hub weather-station-alpha weather-station-beta hello-mailbox hello-single-latest-async; do \ printf "$(YELLOW) → Checking $$pkg$(NC)\n"; \ if ! cargo fmt -p $$pkg -- --check 2>&1; then \ printf "$(RED)❌ Formatting check failed for $$pkg$(NC)\n"; \ @@ -297,6 +297,8 @@ examples: cargo build --package weather-station-gamma --target thumbv7em-none-eabihf @printf "$(YELLOW) → Building hello-mailbox (sync)$(NC)\n" cargo build --package hello-mailbox + @printf "$(YELLOW) → Building hello-single-latest-async$(NC)\n" + cargo build --package hello-single-latest-async @printf "$(GREEN)All examples built successfully!$(NC)\n" ## Security & Quality commands diff --git a/README.md b/README.md index 965e324..a569279 100644 --- a/README.md +++ b/README.md @@ -121,8 +121,8 @@ docker compose up | Buffer | Semantics | Use cases | | --- | --- | --- | | **SPMC Ring** | Bounded stream with independent consumers | Sensor telemetry, event logs | -| **SingleLatest** | Only the current value matters | Feature flags, config, UI state | -| [**Mailbox**](https://github.com/aimdb-dev/aimdb/tree/main/examples/hello-mailbox) | Latest instruction wins | Device commands, actuation, RPC | +| [**SingleLatest**](examples/hello-single-latest-async) | Only the current value matters | Feature flags, config, UI state | +| [**Mailbox**](examples/hello-mailbox) | Latest instruction wins | Device commands, actuation, RPC | **Four capability traits** — opt-in, type-checked: diff --git a/examples/hello-single-latest-async/Cargo.toml b/examples/hello-single-latest-async/Cargo.toml new file mode 100644 index 0000000..2a0184d --- /dev/null +++ b/examples/hello-single-latest-async/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "hello-single-latest-async" +version.workspace = true +edition.workspace = true +license.workspace = true +description = "AimDB minimal async example demonstrating SingleLatest buffer semantics" +publish = false + +[dependencies] +aimdb-core = { path = "../../aimdb-core", features = ["std"] } +aimdb-tokio-adapter = { path = "../../aimdb-tokio-adapter", features = ["tokio-runtime"] } +tokio = { workspace = true, features = ["time"] } diff --git a/examples/hello-single-latest-async/README.md b/examples/hello-single-latest-async/README.md new file mode 100644 index 0000000..34f1c9b --- /dev/null +++ b/examples/hello-single-latest-async/README.md @@ -0,0 +1,37 @@ +# hello-single-latest-async: SingleLatest buffer demo + +The `SingleLatest` buffer stores the current value for a record. New writes replace the previous value, so subscribers read the latest state instead of replaying every intermediate update. Use it for feature flags, configuration, UI state, or other records where stale values should be skipped. + +## How it works + +This example registers a `FeatureGate` record with: + +- `BufferCfg::SingleLatest` +- an async `.source()` that publishes rollout percentages +- an async `.tap()` that observes the latest value whenever it changes + +The source sends an initial burst of updates without waiting between writes. The tap prints the latest observed rollout, demonstrating that the buffer carries current state rather than a full event log. + +## How to run + +From the workspace root, run: + +```bash +cargo run -p hello-single-latest-async +``` + +Expected output includes lines similar to: + +```text +=== hello-single-latest-async: SingleLatest buffer demo === + +source published rollout: 0% +source published rollout: 10% +source published rollout: 25% +tap observed current rollout: 25% +source published rollout: 50% +tap observed current rollout: 50% +source published rollout: 100% +tap observed current rollout: 100% +Done. SingleLatest keeps only the current value for each subscriber. +``` diff --git a/examples/hello-single-latest-async/src/main.rs b/examples/hello-single-latest-async/src/main.rs new file mode 100644 index 0000000..27e455f --- /dev/null +++ b/examples/hello-single-latest-async/src/main.rs @@ -0,0 +1,85 @@ +use aimdb_core::{buffer::BufferCfg, AimDbBuilder, Consumer, Producer, RuntimeContext}; +use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt}; +use std::sync::Arc; +use std::time::Duration; + +#[derive(Clone, Debug)] +struct FeatureGate { + rollout_percent: u8, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("=== hello-single-latest-async: SingleLatest buffer demo ===\n"); + + let adapter = Arc::new(TokioAdapter::new()?); + let mut builder = AimDbBuilder::new().runtime(adapter); + + builder.configure::("config.checkout_rollout", |reg| { + reg.buffer(BufferCfg::SingleLatest) + .source(rollout_source) + .tap(rollout_observer); + }); + + let _db = builder.build().await?; + + tokio::time::sleep(Duration::from_millis(700)).await; + println!("Done. SingleLatest keeps only the current value for each subscriber."); + + Ok(()) +} + +async fn rollout_source( + ctx: RuntimeContext, + producer: Producer, +) { + let time = ctx.time(); + + time.sleep(time.millis(50)).await; + + for rollout_percent in [0, 10, 25] { + publish_rollout(&producer, rollout_percent).await; + } + + for rollout_percent in [50, 100] { + time.sleep(time.millis(120)).await; + publish_rollout(&producer, rollout_percent).await; + } +} + +async fn publish_rollout(producer: &Producer, rollout_percent: u8) { + let gate = FeatureGate { rollout_percent }; + match producer.produce(gate).await { + Ok(()) => println!("source published rollout: {rollout_percent}%"), + Err(err) => eprintln!("failed to publish rollout {rollout_percent}%: {err}"), + } +} + +async fn rollout_observer( + ctx: RuntimeContext, + consumer: Consumer, +) { + let Ok(mut reader) = consumer.subscribe() else { + eprintln!("failed to subscribe to config.checkout_rollout"); + return; + }; + let time = ctx.time(); + + let mut first = true; + while let Ok(gate) = reader.recv().await { + if first { + first = false; + if gate.rollout_percent != 0 { + println!( + " (rollouts before {}% were overwritten before the tap could read them - SingleLatest keeps only the latest)", + gate.rollout_percent + ); + } + } + println!("tap observed current rollout: {}%", gate.rollout_percent); + if gate.rollout_percent == 100 { + break; + } + time.sleep(time.millis(90)).await; + } +}