From 28547fea14835c300ab90e2e5126ac62b327991b Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Tue, 14 Apr 2026 12:39:20 +0200 Subject: [PATCH 01/19] add the RFC --- ...2026-04-13-component-cpu-seconds-metric.md | 395 ++++++++++++++++++ 1 file changed, 395 insertions(+) create mode 100644 rfcs/2026-04-13-component-cpu-seconds-metric.md diff --git a/rfcs/2026-04-13-component-cpu-seconds-metric.md b/rfcs/2026-04-13-component-cpu-seconds-metric.md new file mode 100644 index 0000000000000..0b32e969a0ec0 --- /dev/null +++ b/rfcs/2026-04-13-component-cpu-seconds-metric.md @@ -0,0 +1,395 @@ +# RFC 2026-04-13 - Per-component CPU time metric for sync transforms + +The current `utilization` gauge measures the fraction of wall-clock time a +component is not idle (i.e., not waiting on its input channel). Because sync +and function transforms can run concurrently across multiple tokio worker +threads, and because wall-clock "not idle" includes time the OS has preempted +the thread, this gauge does not accurately reflect how much CPU a component +actually consumes. This RFC proposes a new **counter** metric, +`component_cpu_seconds_total`, that tracks the cumulative CPU time consumed +by a component's transform work, measured via OS thread-level CPU clocks. + +## Context + +- The existing `utilization` metric is implemented in `src/utilization.rs`. +- Sync and function transforms are spawned in `src/topology/builder.rs` + via the `Runner` struct (`run_inline` and `run_concurrently` methods). +- The `enable_concurrency` trait method controls whether a transform is + dispatched to parallel `tokio::spawn` tasks (up to + `TRANSFORM_CONCURRENCY_LIMIT`, which defaults to the number of worker + threads). + +## Cross cutting concerns + +- The `utilization` gauge remains as-is. This RFC adds a complementary metric; + it does not replace the existing one. +- Future work could extend this approach to task transforms and sinks. + +## Scope + +### In scope + +- A new `component_cpu_seconds_total` counter for **sync and function + transforms** (both inline and concurrent execution paths). +- Two implementation tiers: a wall-clock fallback that works everywhere, and a + precise thread-CPU-time implementation using OS APIs. +- Feasibility analysis of thread-level CPU time measurement. + +### Out of scope + +- Task transforms (async stream-based). Their execution interleaves with the + tokio runtime in ways that make per-poll CPU measurement a distinct problem. + Furthermore, all task transforms in Vector are currently single-threaded (they + do not parallelize work), making the `utilization` metric a good indicator of + their actual usage. +- Sources and sinks. +- Replacing or modifying the existing `utilization` gauge. + +## Pain + +1. **Utilization is misleading under concurrency.** In the concurrent + `run_concurrently` path, the utilization timer stays in "not waiting" state + from the moment events are received (`stop_wait` in `on_events_received`) + until a completed task's output is sent (`start_wait` in `send_outputs`). + The actual CPU work happens on separate `tokio::spawn`'d tasks that the + timer does not track. This means utilization measures **occupancy** (is + there at least one batch in flight?) rather than CPU consumption. + + Concrete example: a concurrent remap with 4 in-flight tasks each taking + 10ms, input arriving every 5ms. Input arrives frequently enough that + `stop_wait` fires before each spawn, keeping the timer in "not waiting" + almost continuously → utilization ≈ 100%. But actual CPU consumption is + 4 × 10ms / 20ms = 2 cores. The utilization gauge cannot distinguish + "2 cores" from "0.3 cores at 100% occupancy." + +2. **No way to detect CPU-bound transforms.** Operators tuning pipelines need to + know which transforms are CPU-bottlenecked. A `cpu_seconds_total` counter, + when divided by wall-clock time, directly gives CPU core utilization and can + exceed 1.0 when a transform genuinely uses multiple cores. + +## Proposal + +### User Experience + +A new counter metric is emitted for every sync/function transform: + +```prometheus +component_cpu_seconds_total{component_id="my_remap",component_kind="transform",component_type="remap"} 14.207 +``` + +Operators use it exactly like `process_cpu_seconds_total` from the Prometheus +ecosystem: + +```promql +# Per-component CPU core usage (can exceed 1.0 with concurrency) +component_cpu_seconds_total{component_id="my_remap"}.as_rate() + +# Compare against utilization to spot backpressure vs CPU bottleneck +rate(component_cpu_seconds_total{component_id="my_remap"}[1m]) + / + utilization{component_id="my_remap"} +``` + +This metric is always emitted for sync/function transforms; there is no +configuration knob. + +### Implementation + +#### Metric definition + +Register a `counter!("component_cpu_seconds_total")` in the `Runner` struct, +alongside the existing `timer_tx` and `events_received` fields: + +```rust +struct Runner { + transform: Box, + input_rx: Option>, + input_type: DataType, + outputs: TransformOutputs, + timer_tx: UtilizationComponentSender, + latency_recorder: LatencyRecorder, + events_received: Registered, + cpu_seconds: Counter, // NEW +} +``` + +#### Supported OS: Thread CPU time (precise, Linux and macOS) + +For precise measurement, we read the calling thread's CPU clock before and after +`transform_all`. This counts only time the thread was actually scheduled on a +CPU, excluding preemption, involuntary context switches, and any time another +process used the core. + +**Linux and macOS — `clock_gettime(CLOCK_THREAD_CPUTIME_ID)`** + +```rust +#[cfg(any(target_os = "linux", target_os = "macos"))] +fn thread_cpu_time() -> Duration { + let mut ts = libc::timespec { tv_sec: 0, tv_nsec: 0 }; + // SAFETY: ts is a valid pointer to a timespec struct and + // CLOCK_THREAD_CPUTIME_ID is a valid clock id on Linux >= 2.6 + // and macOS >= 10.12. + unsafe { + libc::clock_gettime(libc::CLOCK_THREAD_CPUTIME_ID, &mut ts); + } + Duration::new(ts.tv_sec as u64, ts.tv_nsec as u32) +} +``` + +This API is available on both Linux and macOS (since 10.12 Sierra, 2016). It +measures CPU time for the **calling thread** with nanosecond granularity. +Since `transform_all` is synchronous and runs entirely within a single thread +poll, the delta between two calls around `transform_all` gives exact CPU time +consumed by that transform invocation. + +**Overhead:** On Linux, `clock_gettime(CLOCK_THREAD_CPUTIME_ID)` is +vDSO-accelerated and costs ~20-60ns per call. On macOS the kernel path is +slightly heavier (~100-200ns) but still negligible compared to actual transform +work. With two calls per `transform_all` invocation the total overhead is +well under 500ns per batch on either platform. + +**Windows — `GetThreadTimes`** + +Windows exposes per-thread CPU time via `GetThreadTimes`. A precise +implementation is feasible but requires adding a `windows-sys` (or `winapi`) +dependency, which is not currently used by Vector. The implementation is +deferred; Windows falls back to wall-clock time for now (see below). + +```rust +// Future work — not yet implemented. +#[cfg(target_os = "windows")] +fn thread_cpu_time() -> Duration { + use windows_sys::Win32::System::Threading::*; + let mut creation = 0u64; + let mut exit = 0u64; + let mut kernel = 0u64; + let mut user = 0u64; + // SAFETY: GetCurrentThread() returns a pseudo-handle that is always valid. + unsafe { + GetThreadTimes( + GetCurrentThread(), + &mut creation as *mut _ as *mut _, + &mut exit as *mut _ as *mut _, + &mut kernel as *mut _ as *mut _, + &mut user as *mut _ as *mut _, + ); + } + // FILETIME is in 100ns units + Duration::from_nanos((kernel + user) * 100) +} +``` + +#### Fallback: Wall-clock timing (Windows and other platforms) + +On Windows and any other platform not covered above, fall back to wall-clock +time: + +```rust +fn thread_cpu_time() -> Duration { + Instant::now() +} +``` + +This is simple and portable. Its accuracy is good for CPU-bound sync transforms +because `transform_all` is a synchronous call that does not yield to the tokio +runtime. The main source of inaccuracy is OS-level thread preemption: if the OS +schedules another process onto the core during `transform_all`, that wall-clock +time is counted as CPU time even though Vector was not executing. + +For small workloads (lightly loaded hosts, transforms that complete in +microseconds to low milliseconds), the preemption error is negligible. + +#### Why thread CPU time works for sync transforms + +The critical property that makes this approach accurate is that `transform_all` +is a **synchronous, non-yielding** call. When the tokio runtime polls the future +containing `transform_all`: + +1. The runtime's worker thread enters the poll. +2. `transform_all` executes to completion without any `.await` points. +3. Control returns to the runtime. + +Between steps 1 and 3, the worker thread is exclusively executing transform +code. Reading the thread CPU clock before and after captures exactly the CPU +time consumed, regardless of: + +- Other tokio tasks that may be queued (they can't preempt a synchronous call). +- OS preemption (thread CPU clock excludes time spent not running). +- Other concurrent `tokio::spawn` tasks on different threads (each measures its + own thread independently). + +This would **not** work for async task transforms, where a single `poll` may +interleave with unrelated futures on the same worker thread. + +#### Concurrent execution and multi-core accounting + +In the concurrent path (`run_concurrently`), each `tokio::spawn` task measures +its own CPU time independently. If 4 tasks each consume 0.25s of CPU in +parallel, the counter increments by 1.0s total — correctly reflecting that the +transform used 1.0 CPU-second even though only 0.25s of wall time elapsed. + +This matches the semantics of `process_cpu_seconds_total` and Linux's +`/proc/[pid]/stat` CPU accounting: the counter can increase faster than +wall-clock time when work is parallelized. + +#### Integration into the Runner + +```rust +impl Runner { + async fn run_inline(mut self) -> TaskResult { + const INLINE_BATCH_SIZE: usize = 128; + let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE); + let mut input_rx = self.input_rx.take().expect("can't run runner twice") + .into_stream() + .filter(move |events| ready(filter_events_type(events, self.input_type))); + + self.timer_tx.try_send_start_wait(); + while let Some(events) = input_rx.next().await { + self.on_events_received(&events); + + let cpu_before = thread_cpu_time(); // NEW + self.transform.transform_all(events, &mut outputs_buf); + let cpu_after = thread_cpu_time(); // NEW + let cpu_delta = cpu_after.saturating_sub(cpu_before); + self.cpu_seconds.increment(cpu_delta.as_secs_f64()); + + self.send_outputs(&mut outputs_buf).await + .map_err(TaskError::wrapped)?; + } + Ok(TaskOutput::Transform) + } + + async fn run_concurrently(mut self) -> TaskResult { + // ... (existing setup) ... + input_arrays = input_rx.next(), ... => { + match input_arrays { + Some(input_arrays) => { + // ... (existing event counting) ... + let mut t = self.transform.clone(); + let mut outputs_buf = self.outputs.new_buf_with_capacity(len); + let cpu_counter = self.cpu_seconds.clone(); // NEW + let task = tokio::spawn(async move { + let cpu_before = thread_cpu_time(); // NEW + for events in input_arrays { + t.transform_all(events, &mut outputs_buf); + } + let cpu_after = thread_cpu_time(); // NEW + let cpu_delta = cpu_after.saturating_sub(cpu_before); + cpu_counter.increment(cpu_delta.as_secs_f64()); + outputs_buf + }.in_current_span()); + in_flight.push_back(task); + } + // ... + } + } + // ... + } +} +``` + +#### Module structure + +Add a new module `src/cpu_time.rs`: + +```rust +/// Returns the CPU time consumed by the calling thread. +/// +/// On Linux and macOS, uses clock_gettime(CLOCK_THREAD_CPUTIME_ID) (nanosecond precision). +/// On other platforms, falls back to Instant::now() (wall-clock time). +pub(crate) fn thread_cpu_time() -> Duration { ... } +``` + +This keeps the platform-specific FFI contained in one file and testable +independently. + +## Rationale + +- **Direct CPU cost visibility.** Operators can identify which transforms are + CPU-bottlenecked vs. backpressure-limited, enabling informed tuning. +- **Composable with existing metrics.** `rate(cpu_seconds[1m])` gives CPU + cores used; dividing by `utilization` separates CPU from pipeline effects. +- **Low overhead.** Two `clock_gettime` calls per batch (~80ns total on Linux) + is negligible relative to the work `transform_all` performs. +- **Familiar semantics.** Follows the `cpu_seconds_total` convention from + Prometheus, Linux `/proc`, and cAdvisor — operators already know how to use it. + +## Drawbacks + +- **Platform-specific code.** The precise implementation uses `cfg`-gated FFI + for Linux and macOS. Windows and other platforms fall back to wall-clock time. +- **Windows accuracy gap.** Windows ships a suitable API (`GetThreadTimes`) but + it is not yet implemented because it requires adding a `windows-sys` dependency + that Vector does not currently carry. Windows users get wall-clock time until + that is addressed. + +## Alternatives + +### Extend the existing utilization gauge + +Add a CPU-time-based "utilization v2" that replaces the current gauge. + +**Rejected because:** The current utilization metric serves a different purpose +(pipeline flow analysis: is this component starved or saturated?). CPU time is a +complementary signal, not a replacement. Conflating them would lose information. + +### Per-event latency histogram + +Emit a histogram of per-event processing time instead of a cumulative counter. + +**Rejected because:** Histograms are expensive at high throughput (Vector +processes millions of events/sec). A counter that increments once per batch is +far cheaper. Per-event latency can be derived from the counter and +`events_sent_total` if needed (`cpu_seconds / events = avg cpu per event`). + +### `getrusage(RUSAGE_THREAD)` instead of `clock_gettime` + +On Linux, `getrusage(RUSAGE_THREAD)` also provides per-thread CPU time (as +`ru_utime` + `ru_stime`). + +**Not preferred because:** `clock_gettime(CLOCK_THREAD_CPUTIME_ID)` has +nanosecond precision vs. microsecond for `getrusage`. Both are vDSO-accelerated +on modern kernels. The higher precision is worth the identical cost. + +## Outstanding Questions + +1. **Metric name:** Should it be `component_cpu_seconds_total` (following + Prometheus conventions and the `component_` prefix used by existing Vector + metrics) or `transform_cpu_seconds_total` (since only transforms emit it + initially)? The `component_` prefix is preferred for consistency and to allow + future extension to sources and sinks. +2. **User/system split:** Should we report user and system CPU time separately + (as `mode="user"` / `mode="system"` tags) like `host_cpu_seconds_total` + does? The Linux API supports this. It adds cardinality but helps distinguish + transforms that trigger syscalls (e.g., enrichment table lookups) from pure + computation. + +## Plan Of Attack + +- Add `src/cpu_time.rs` module with `thread_cpu_time()` and platform-specific + implementations behind `#[cfg]` gates. Include unit tests that verify the + returned duration is non-zero and monotonically increasing. +- Register `counter!("component_cpu_seconds_total")` in `Runner::new` and + instrument `run_inline` with wall-clock timing (Tier 1). +- Instrument `run_concurrently` with wall-clock timing (Tier 1). Verify the + counter increments correctly when multiple tasks run in parallel. +- Switch from `Instant::now()` to `thread_cpu_time()` (Tier 2). Benchmark + the overhead on Linux to confirm it is <100ns per call. +- Add integration test: run a CPU-intensive remap transform, verify + `component_cpu_seconds_total` is within 10% of expected CPU time. +- Add documentation for the new metric in the generated component docs. +- Add changelog fragment. + +## Future Improvements + +- Extend `component_cpu_seconds_total` to **task transforms** by measuring CPU + time per `poll` of the transform stream. This requires careful accounting to + exclude time spent in the tokio runtime between polls. +- Extend to **sources and sinks** where the component owns a synchronous + processing step (e.g., codec encoding in sinks). +- Expose a derived `**cpu_utilization` gauge\*\* (CPU seconds / wall seconds) + computed by the `UtilizationEmitter` for operators who prefer a ready-to-use + ratio. +- Add `mode="user"` / `mode="system"` tag split for deeper CPU profiling. +- Implement the Windows `GetThreadTimes` path once a `windows-sys` dependency + is available in Vector. From a078c1ec7e3a9cf887784e69657cc5ee5285eb81 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Tue, 14 Apr 2026 14:17:27 +0200 Subject: [PATCH 02/19] add impl for linux and macos --- Cargo.toml | 1 + src/cpu_time.rs | 116 ++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/topology/builder.rs | 10 +++- 4 files changed, 127 insertions(+), 1 deletion(-) create mode 100644 src/cpu_time.rs diff --git a/Cargo.toml b/Cargo.toml index 84d89c50096f9..eb80e74fc4561 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -396,6 +396,7 @@ itertools.workspace = true k8s-openapi = { version = "0.27.0", default-features = false, features = ["v1_31"], optional = true } kube = { version = "3.0.1", default-features = false, features = ["client", "openssl-tls", "runtime"], optional = true } listenfd = { version = "1.0.2", default-features = false, optional = true } +libc.workspace = true lru = { version = "0.16.3", default-features = false } maxminddb = { version = "0.27.0", default-features = false, optional = true, features = ["simdutf8"] } md-5 = { version = "0.10", default-features = false, optional = true } diff --git a/src/cpu_time.rs b/src/cpu_time.rs new file mode 100644 index 0000000000000..097a41bfff838 --- /dev/null +++ b/src/cpu_time.rs @@ -0,0 +1,116 @@ +use std::time::Duration; + +/// An opaque snapshot of thread CPU time. +/// +/// On Linux and macOS this uses `CLOCK_THREAD_CPUTIME_ID`, which measures +/// only the time the calling thread was actually scheduled on a CPU (true CPU +/// time, excluding preemption and context switches to other threads/processes). +/// +/// On other platforms this falls back to wall-clock time via +/// [`std::time::Instant`]. +/// +/// # Usage +/// +/// Call [`ThreadTime::now`] immediately before the work to measure, then call +/// [`ThreadTime::elapsed`] immediately after: +/// +/// ```ignore +/// let t0 = ThreadTime::now(); +/// do_work(); +/// let cpu_time = t0.elapsed(); +/// ``` +/// +/// # Correctness for sync transforms +/// +/// This measurement is accurate for [`crate::transforms::SyncTransform`] +/// because `transform_all` is synchronous and non-yielding: between the two +/// measurement points the worker thread runs only transform code, with no +/// `.await` points that could interleave other tokio tasks. +pub(crate) struct ThreadTime(Inner); + +impl ThreadTime { + /// Captures the current thread CPU time. + #[inline] + pub(crate) fn now() -> Self { + ThreadTime(Inner::now()) + } + + /// Returns the CPU time elapsed since this snapshot was taken. + #[inline] + pub(crate) fn elapsed(&self) -> Duration { + self.0.elapsed() + } +} + +// ── Linux / macOS: CLOCK_THREAD_CPUTIME_ID ──────────────────────────────── + +#[cfg(any(target_os = "linux", target_os = "macos"))] +struct Inner(Duration); + +#[cfg(any(target_os = "linux", target_os = "macos"))] +impl Inner { + fn now() -> Self { + let mut ts = libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }; + // SAFETY: + // - `ts` is a valid, zero-initialised `timespec` on the stack. + // - `CLOCK_THREAD_CPUTIME_ID` is a valid clock ID on Linux ≥ 2.6 and + // macOS ≥ 10.12 (both are baseline requirements for Vector). + // - The return value is intentionally ignored: the only failure modes + // are an invalid clock ID (not the case here) or an invalid pointer + // (not the case here), neither of which can occur. + unsafe { + libc::clock_gettime(libc::CLOCK_THREAD_CPUTIME_ID, &mut ts); + } + Inner(Duration::new(ts.tv_sec as u64, ts.tv_nsec as u32)) + } + + #[inline] + fn elapsed(&self) -> Duration { + Self::now().0.saturating_sub(self.0) + } +} + +// ── Other platforms: wall-clock fallback ────────────────────────────────── + +#[cfg(not(any(target_os = "linux", target_os = "macos")))] +struct Inner(std::time::Instant); + +#[cfg(not(any(target_os = "linux", target_os = "macos")))] +impl Inner { + fn now() -> Self { + Inner(std::time::Instant::now()) + } + + #[inline] + fn elapsed(&self) -> Duration { + self.0.elapsed() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn elapsed_is_non_negative() { + let t0 = ThreadTime::now(); + // Burn a small amount of CPU to ensure the clock advances. + let _: u64 = (0u64..10_000).sum(); + assert!(t0.elapsed() >= Duration::ZERO); + } + + #[test] + fn elapsed_is_monotone() { + // Two consecutive elapsed() calls on the same snapshot must be + // non-decreasing (the clock never goes backwards). + let t0 = ThreadTime::now(); + let _: u64 = (0u64..10_000).sum(); + let first = t0.elapsed(); + let _: u64 = (0u64..10_000).sum(); + let second = t0.elapsed(); + assert!(second >= first, "clock went backwards: {second:?} < {first:?}"); + } +} diff --git a/src/lib.rs b/src/lib.rs index 8d26a3b080ecf..41fcdefcd9695 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -126,6 +126,7 @@ pub mod trace; pub mod transforms; pub mod types; pub mod unit_test; +pub(crate) mod cpu_time; pub(crate) mod utilization; pub mod validate; #[cfg(windows)] diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 8048b7a6073ed..87e62d1779f7e 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -8,7 +8,7 @@ use std::{ use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered}; use futures_util::stream::FuturesUnordered; -use metrics::gauge; +use metrics::{Gauge, gauge}; use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire}; use tokio::{ select, @@ -41,6 +41,7 @@ use super::{ }; use crate::{ SourceSender, + cpu_time::ThreadTime, config::{ ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId, ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput, @@ -1130,6 +1131,7 @@ struct Runner { timer_tx: UtilizationComponentSender, latency_recorder: LatencyRecorder, events_received: Registered, + cpu_seconds: Gauge, } impl Runner { @@ -1149,6 +1151,7 @@ impl Runner { timer_tx, latency_recorder, events_received: register!(EventsReceived), + cpu_seconds: gauge!("component_cpu_seconds_total"), } } @@ -1184,7 +1187,9 @@ impl Runner { self.timer_tx.try_send_start_wait(); while let Some(events) = input_rx.next().await { self.on_events_received(&events); + let t0 = ThreadTime::now(); self.transform.transform_all(events, &mut outputs_buf); + self.cpu_seconds.increment(t0.elapsed().as_secs_f64()); self.send_outputs(&mut outputs_buf) .await .map_err(TaskError::wrapped)?; @@ -1233,10 +1238,13 @@ impl Runner { let mut t = self.transform.clone(); let mut outputs_buf = self.outputs.new_buf_with_capacity(len); + let cpu_seconds = self.cpu_seconds.clone(); let task = tokio::spawn(async move { + let t0 = ThreadTime::now(); for events in input_arrays { t.transform_all(events, &mut outputs_buf); } + cpu_seconds.increment(t0.elapsed().as_secs_f64()); outputs_buf }.in_current_span()); in_flight.push_back(task); From c3eae53d0124253233a6788171fb4ed603111ff3 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Tue, 14 Apr 2026 14:21:59 +0200 Subject: [PATCH 03/19] add windows implementation --- Cargo.lock | 1 + Cargo.toml | 1 + ...2026-04-13-component-cpu-seconds-metric.md | 53 ++++++++-------- src/cpu_time.rs | 60 ++++++++++++++++++- 4 files changed, 84 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d013bc09432ec..bb18fb96c7a97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12435,6 +12435,7 @@ dependencies = [ "warp", "windows 0.58.0", "windows-service", + "windows-sys 0.52.0", "wiremock", "zstd 0.13.2", ] diff --git a/Cargo.toml b/Cargo.toml index eb80e74fc4561..bf9998c89697b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -452,6 +452,7 @@ sysinfo = "0.37.2" byteorder = "1.5.0" [target.'cfg(windows)'.dependencies] +windows-sys = { version = "0.52", features = ["Win32_Foundation", "Win32_System_Threading"] } windows-service = "0.8.0" windows = { version = "0.58", features = ["Win32_System_EventLog", "Win32_Foundation", "Win32_System_Com", "Win32_Security", "Win32_Security_Authorization", "Win32_System_Threading", "Win32_Storage_FileSystem"], optional = true } quick-xml = { version = "0.31", default-features = false, features = ["serialize"], optional = true } diff --git a/rfcs/2026-04-13-component-cpu-seconds-metric.md b/rfcs/2026-04-13-component-cpu-seconds-metric.md index 0b32e969a0ec0..7397c2deb9805 100644 --- a/rfcs/2026-04-13-component-cpu-seconds-metric.md +++ b/rfcs/2026-04-13-component-cpu-seconds-metric.md @@ -150,39 +150,39 @@ well under 500ns per batch on either platform. **Windows — `GetThreadTimes`** -Windows exposes per-thread CPU time via `GetThreadTimes`. A precise -implementation is feasible but requires adding a `windows-sys` (or `winapi`) -dependency, which is not currently used by Vector. The implementation is -deferred; Windows falls back to wall-clock time for now (see below). +Windows exposes per-thread CPU time via `GetThreadTimes`, providing the same +guarantee as `CLOCK_THREAD_CPUTIME_ID` with 100ns granularity. It is +implemented using the `windows-sys` crate (added as a +`[target.'cfg(windows)'.dependencies]`), which is already a transitive +dependency of Vector. ```rust -// Future work — not yet implemented. #[cfg(target_os = "windows")] fn thread_cpu_time() -> Duration { - use windows_sys::Win32::System::Threading::*; - let mut creation = 0u64; - let mut exit = 0u64; - let mut kernel = 0u64; - let mut user = 0u64; + use windows_sys::Win32::Foundation::FILETIME; + use windows_sys::Win32::System::Threading::{GetCurrentThread, GetThreadTimes}; + + let mut creation = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; + let mut exit = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; + let mut kernel = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; + let mut user = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; + // SAFETY: GetCurrentThread() returns a pseudo-handle that is always valid. unsafe { - GetThreadTimes( - GetCurrentThread(), - &mut creation as *mut _ as *mut _, - &mut exit as *mut _ as *mut _, - &mut kernel as *mut _ as *mut _, - &mut user as *mut _ as *mut _, - ); + GetThreadTimes(GetCurrentThread(), &mut creation, &mut exit, &mut kernel, &mut user); } - // FILETIME is in 100ns units - Duration::from_nanos((kernel + user) * 100) + + // FILETIME stores time as 100ns intervals split across two u32 fields. + let to_nanos = |ft: FILETIME| { + (((ft.dwHighDateTime as u64) << 32) | ft.dwLowDateTime as u64) * 100 + }; + Duration::from_nanos(to_nanos(kernel) + to_nanos(user)) } ``` -#### Fallback: Wall-clock timing (Windows and other platforms) +#### Fallback: Wall-clock timing (other platforms) -On Windows and any other platform not covered above, fall back to wall-clock -time: +On any platform not covered above, fall back to wall-clock time: ```rust fn thread_cpu_time() -> Duration { @@ -317,11 +317,8 @@ independently. ## Drawbacks - **Platform-specific code.** The precise implementation uses `cfg`-gated FFI - for Linux and macOS. Windows and other platforms fall back to wall-clock time. -- **Windows accuracy gap.** Windows ships a suitable API (`GetThreadTimes`) but - it is not yet implemented because it requires adding a `windows-sys` dependency - that Vector does not currently carry. Windows users get wall-clock time until - that is addressed. + for Linux, macOS, and Windows. Other platforms fall back to wall-clock time, + giving three maintained code paths plus one fallback. ## Alternatives @@ -391,5 +388,3 @@ on modern kernels. The higher precision is worth the identical cost. computed by the `UtilizationEmitter` for operators who prefer a ready-to-use ratio. - Add `mode="user"` / `mode="system"` tag split for deeper CPU profiling. -- Implement the Windows `GetThreadTimes` path once a `windows-sys` dependency - is available in Vector. diff --git a/src/cpu_time.rs b/src/cpu_time.rs index 097a41bfff838..978a39b2dbb28 100644 --- a/src/cpu_time.rs +++ b/src/cpu_time.rs @@ -6,6 +6,9 @@ use std::time::Duration; /// only the time the calling thread was actually scheduled on a CPU (true CPU /// time, excluding preemption and context switches to other threads/processes). /// +/// On Windows this uses `GetThreadTimes`, which provides the same guarantee +/// with 100ns granularity. +/// /// On other platforms this falls back to wall-clock time via /// [`std::time::Instant`]. /// @@ -73,12 +76,65 @@ impl Inner { } } +// ── Windows: GetThreadTimes ─────────────────────────────────────────────── + +#[cfg(target_os = "windows")] +struct Inner(Duration); + +#[cfg(target_os = "windows")] +impl Inner { + fn now() -> Self { + use windows_sys::Win32::Foundation::FILETIME; + use windows_sys::Win32::System::Threading::{GetCurrentThread, GetThreadTimes}; + + let mut creation = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; + let mut exit = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; + let mut kernel = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; + let mut user = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; + + // SAFETY: + // - `GetCurrentThread()` returns a pseudo-handle that is always valid + // and does not need to be closed. + // - All four `FILETIME` pointers are valid, properly aligned, and + // stack-allocated. + // - The return value is intentionally ignored: failure is only possible + // with an invalid handle, which cannot occur with `GetCurrentThread()`. + unsafe { + GetThreadTimes( + GetCurrentThread(), + &mut creation, + &mut exit, + &mut kernel, + &mut user, + ); + } + + // Combine the low/high halves of each FILETIME into a u64, then sum + // kernel + user. FILETIME units are 100-nanosecond intervals. + let kernel_ns = filetime_to_nanos(kernel); + let user_ns = filetime_to_nanos(user); + Inner(Duration::from_nanos(kernel_ns + user_ns)) + } + + #[inline] + fn elapsed(&self) -> Duration { + Self::now().0.saturating_sub(self.0) + } +} + +#[cfg(target_os = "windows")] +#[inline] +fn filetime_to_nanos(ft: windows_sys::Win32::Foundation::FILETIME) -> u64 { + let ticks = ((ft.dwHighDateTime as u64) << 32) | (ft.dwLowDateTime as u64); + ticks * 100 // convert 100ns intervals to nanoseconds +} + // ── Other platforms: wall-clock fallback ────────────────────────────────── -#[cfg(not(any(target_os = "linux", target_os = "macos")))] +#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] struct Inner(std::time::Instant); -#[cfg(not(any(target_os = "linux", target_os = "macos")))] +#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] impl Inner { fn now() -> Self { Inner(std::time::Instant::now()) From 8e4822bea2f4348cfaf11f059ea6ed803827f4a6 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Wed, 15 Apr 2026 13:43:54 +0200 Subject: [PATCH 04/19] rename to component_cpu_usage_seconds_total --- ...2026-04-13-component-cpu-seconds-metric.md | 20 +++++++++---------- src/topology/builder.rs | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/rfcs/2026-04-13-component-cpu-seconds-metric.md b/rfcs/2026-04-13-component-cpu-seconds-metric.md index 7397c2deb9805..fcd287a6ec55b 100644 --- a/rfcs/2026-04-13-component-cpu-seconds-metric.md +++ b/rfcs/2026-04-13-component-cpu-seconds-metric.md @@ -6,7 +6,7 @@ and function transforms can run concurrently across multiple tokio worker threads, and because wall-clock "not idle" includes time the OS has preempted the thread, this gauge does not accurately reflect how much CPU a component actually consumes. This RFC proposes a new **counter** metric, -`component_cpu_seconds_total`, that tracks the cumulative CPU time consumed +`component_cpu_usage_seconds_total`, that tracks the cumulative CPU time consumed by a component's transform work, measured via OS thread-level CPU clocks. ## Context @@ -29,7 +29,7 @@ by a component's transform work, measured via OS thread-level CPU clocks. ### In scope -- A new `component_cpu_seconds_total` counter for **sync and function +- A new `component_cpu_usage_seconds_total` counter for **sync and function transforms** (both inline and concurrent execution paths). - Two implementation tiers: a wall-clock fallback that works everywhere, and a precise thread-CPU-time implementation using OS APIs. @@ -74,7 +74,7 @@ by a component's transform work, measured via OS thread-level CPU clocks. A new counter metric is emitted for every sync/function transform: ```prometheus -component_cpu_seconds_total{component_id="my_remap",component_kind="transform",component_type="remap"} 14.207 +component_cpu_usage_seconds_total{component_id="my_remap",component_kind="transform",component_type="remap"} 14.207 ``` Operators use it exactly like `process_cpu_seconds_total` from the Prometheus @@ -82,10 +82,10 @@ ecosystem: ```promql # Per-component CPU core usage (can exceed 1.0 with concurrency) -component_cpu_seconds_total{component_id="my_remap"}.as_rate() +component_cpu_usage_seconds_total{component_id="my_remap"}.as_rate() # Compare against utilization to spot backpressure vs CPU bottleneck -rate(component_cpu_seconds_total{component_id="my_remap"}[1m]) +rate(component_cpu_usage_seconds_total{component_id="my_remap"}[1m]) / utilization{component_id="my_remap"} ``` @@ -97,7 +97,7 @@ configuration knob. #### Metric definition -Register a `counter!("component_cpu_seconds_total")` in the `Runner` struct, +Register a `counter!("component_cpu_usage_seconds_total")` in the `Runner` struct, alongside the existing `timer_tx` and `events_received` fields: ```rust @@ -350,7 +350,7 @@ on modern kernels. The higher precision is worth the identical cost. ## Outstanding Questions -1. **Metric name:** Should it be `component_cpu_seconds_total` (following +1. **Metric name:** Should it be `component_cpu_usage_seconds_total` (following Prometheus conventions and the `component_` prefix used by existing Vector metrics) or `transform_cpu_seconds_total` (since only transforms emit it initially)? The `component_` prefix is preferred for consistency and to allow @@ -366,20 +366,20 @@ on modern kernels. The higher precision is worth the identical cost. - Add `src/cpu_time.rs` module with `thread_cpu_time()` and platform-specific implementations behind `#[cfg]` gates. Include unit tests that verify the returned duration is non-zero and monotonically increasing. -- Register `counter!("component_cpu_seconds_total")` in `Runner::new` and +- Register `counter!("component_cpu_usage_seconds_total")` in `Runner::new` and instrument `run_inline` with wall-clock timing (Tier 1). - Instrument `run_concurrently` with wall-clock timing (Tier 1). Verify the counter increments correctly when multiple tasks run in parallel. - Switch from `Instant::now()` to `thread_cpu_time()` (Tier 2). Benchmark the overhead on Linux to confirm it is <100ns per call. - Add integration test: run a CPU-intensive remap transform, verify - `component_cpu_seconds_total` is within 10% of expected CPU time. + `component_cpu_usage_seconds_total` is within 10% of expected CPU time. - Add documentation for the new metric in the generated component docs. - Add changelog fragment. ## Future Improvements -- Extend `component_cpu_seconds_total` to **task transforms** by measuring CPU +- Extend `component_cpu_usage_seconds_total` to **task transforms** by measuring CPU time per `poll` of the transform stream. This requires careful accounting to exclude time spent in the tokio runtime between polls. - Extend to **sources and sinks** where the component owns a synchronous diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 87e62d1779f7e..60bdf19ec76f0 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -41,11 +41,11 @@ use super::{ }; use crate::{ SourceSender, - cpu_time::ThreadTime, config::{ ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId, ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput, }, + cpu_time::ThreadTime, event::{EventArray, EventContainer}, extra_context::ExtraContext, internal_events::EventsReceived, @@ -1151,7 +1151,7 @@ impl Runner { timer_tx, latency_recorder, events_received: register!(EventsReceived), - cpu_seconds: gauge!("component_cpu_seconds_total"), + cpu_seconds: gauge!("component_cpu_usage_seconds_total"), } } From 8b950b9e4dd429e9c69cb830e071911f382a68f7 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Wed, 15 Apr 2026 13:49:50 +0200 Subject: [PATCH 05/19] add documentation --- .../cue/reference/components/sources/internal_metrics.cue | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index 98ed1ffe025d7..21fd2b3d840ae 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -359,6 +359,12 @@ components: sources: internal_metrics: { default_namespace: "vector" tags: _component_tags } + component_cpu_usage_seconds_total: { + description: "The CPU time consumed by a component. Available for sync and function transforms." + type: "counter" + default_namespace: "vector" + tags: _component_tags + } component_discarded_events_total: { description: "The number of events dropped by this component." type: "counter" From 7ab695418bcc08bdfb42672c1d3707a354b0dd2b Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Thu, 16 Apr 2026 09:37:08 +0200 Subject: [PATCH 06/19] add changelog --- changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md diff --git a/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md b/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md new file mode 100644 index 0000000000000..45d9ed496af56 --- /dev/null +++ b/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md @@ -0,0 +1,4 @@ +Added a new counter metric `component_cpu_usage_seconds_total` counting the CPU +time consumed by a transform. Only supported for sync and function transforms. + +authors: gwenaskell From b76f8e719be7ad997971da0e3d07d6d6acd42177 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Thu, 16 Apr 2026 09:46:50 +0200 Subject: [PATCH 07/19] format --- src/cpu_time.rs | 27 +++++++++++++++++++++------ src/lib.rs | 2 +- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/cpu_time.rs b/src/cpu_time.rs index 978a39b2dbb28..7dc455e5939d2 100644 --- a/src/cpu_time.rs +++ b/src/cpu_time.rs @@ -87,10 +87,22 @@ impl Inner { use windows_sys::Win32::Foundation::FILETIME; use windows_sys::Win32::System::Threading::{GetCurrentThread, GetThreadTimes}; - let mut creation = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; - let mut exit = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; - let mut kernel = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; - let mut user = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; + let mut creation = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + let mut exit = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + let mut kernel = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + let mut user = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; // SAFETY: // - `GetCurrentThread()` returns a pseudo-handle that is always valid @@ -112,7 +124,7 @@ impl Inner { // Combine the low/high halves of each FILETIME into a u64, then sum // kernel + user. FILETIME units are 100-nanosecond intervals. let kernel_ns = filetime_to_nanos(kernel); - let user_ns = filetime_to_nanos(user); + let user_ns = filetime_to_nanos(user); Inner(Duration::from_nanos(kernel_ns + user_ns)) } @@ -167,6 +179,9 @@ mod tests { let first = t0.elapsed(); let _: u64 = (0u64..10_000).sum(); let second = t0.elapsed(); - assert!(second >= first, "clock went backwards: {second:?} < {first:?}"); + assert!( + second >= first, + "clock went backwards: {second:?} < {first:?}" + ); } } diff --git a/src/lib.rs b/src/lib.rs index 41fcdefcd9695..2a876e05ea984 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,6 +81,7 @@ pub mod aws; pub mod common; pub mod completion; mod convert_config; +pub(crate) mod cpu_time; pub mod encoding_transcode; pub mod enrichment_tables; pub mod extra_context; @@ -126,7 +127,6 @@ pub mod trace; pub mod transforms; pub mod types; pub mod unit_test; -pub(crate) mod cpu_time; pub(crate) mod utilization; pub mod validate; #[cfg(windows)] From 5a6a6b9f0a203f6ff9e1c39643be0cbcc7dae333 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Thu, 16 Apr 2026 11:38:14 +0200 Subject: [PATCH 08/19] use a counter and report nanoseconds --- ...2026-04-13-component-cpu-seconds-metric.md | 89 ++++++++++--------- src/topology/builder.rs | 12 +-- 2 files changed, 51 insertions(+), 50 deletions(-) diff --git a/rfcs/2026-04-13-component-cpu-seconds-metric.md b/rfcs/2026-04-13-component-cpu-seconds-metric.md index fcd287a6ec55b..cbc318b6caf46 100644 --- a/rfcs/2026-04-13-component-cpu-seconds-metric.md +++ b/rfcs/2026-04-13-component-cpu-seconds-metric.md @@ -6,8 +6,9 @@ and function transforms can run concurrently across multiple tokio worker threads, and because wall-clock "not idle" includes time the OS has preempted the thread, this gauge does not accurately reflect how much CPU a component actually consumes. This RFC proposes a new **counter** metric, -`component_cpu_usage_seconds_total`, that tracks the cumulative CPU time consumed -by a component's transform work, measured via OS thread-level CPU clocks. +`component_cpu_usage_ns_total`, that tracks the cumulative CPU time consumed +by a component's transform work in nanoseconds, measured via OS thread-level +CPU clocks. ## Context @@ -29,7 +30,7 @@ by a component's transform work, measured via OS thread-level CPU clocks. ### In scope -- A new `component_cpu_usage_seconds_total` counter for **sync and function +- A new `component_cpu_usage_ns_total` counter for **sync and function transforms** (both inline and concurrent execution paths). - Two implementation tiers: a wall-clock fallback that works everywhere, and a precise thread-CPU-time implementation using OS APIs. @@ -63,9 +64,9 @@ by a component's transform work, measured via OS thread-level CPU clocks. "2 cores" from "0.3 cores at 100% occupancy." 2. **No way to detect CPU-bound transforms.** Operators tuning pipelines need to - know which transforms are CPU-bottlenecked. A `cpu_seconds_total` counter, - when divided by wall-clock time, directly gives CPU core utilization and can - exceed 1.0 when a transform genuinely uses multiple cores. + know which transforms are CPU-bottlenecked. A `cpu_usage_ns_total` counter, + when divided by wall-clock time (in ns), directly gives CPU core utilization + and can exceed 1.0 when a transform genuinely uses multiple cores. ## Proposal @@ -74,18 +75,18 @@ by a component's transform work, measured via OS thread-level CPU clocks. A new counter metric is emitted for every sync/function transform: ```prometheus -component_cpu_usage_seconds_total{component_id="my_remap",component_kind="transform",component_type="remap"} 14.207 +component_cpu_usage_ns_total{component_id="my_remap",component_kind="transform",component_type="remap"} 14207 ``` -Operators use it exactly like `process_cpu_seconds_total` from the Prometheus -ecosystem: +The value is cumulative CPU nanoseconds consumed by the component. Operators +use it to compute CPU core utilization: ```promql # Per-component CPU core usage (can exceed 1.0 with concurrency) -component_cpu_usage_seconds_total{component_id="my_remap"}.as_rate() +rate(component_cpu_usage_ns_total{component_id="my_remap"}[1m]) / 1e9 -# Compare against utilization to spot backpressure vs CPU bottleneck -rate(component_cpu_usage_seconds_total{component_id="my_remap"}[1m]) +# Compare against utilization to separate CPU cost from pipeline pressure +rate(component_cpu_usage_ns_total{component_id="my_remap"}[1m]) / 1e9 / utilization{component_id="my_remap"} ``` @@ -97,7 +98,7 @@ configuration knob. #### Metric definition -Register a `counter!("component_cpu_usage_seconds_total")` in the `Runner` struct, +Register a `counter!("component_cpu_usage_ns_total")` in the `Runner` struct, alongside the existing `timer_tx` and `events_received` fields: ```rust @@ -109,10 +110,17 @@ struct Runner { timer_tx: UtilizationComponentSender, latency_recorder: LatencyRecorder, events_received: Registered, - cpu_seconds: Counter, // NEW + cpu_ns: Counter, // NEW } ``` +Using nanoseconds as the counter unit fits naturally in the `metrics` crate's +`u64`-based `Counter`: each call to `counter.increment(delta.as_nanos() as u64)` +is exact integer arithmetic with no floating-point accumulation. The `metrics` +crate stores the counter as `AtomicU64` and casts to `f64` only once at scrape +time, bounding the conversion error to a single rounding rather than an +accumulated sum of many small imprecise additions. + #### Supported OS: Thread CPU time (precise, Linux and macOS) For precise measurement, we read the calling thread's CPU clock before and after @@ -197,7 +205,7 @@ schedules another process onto the core during `transform_all`, that wall-clock time is counted as CPU time even though Vector was not executing. For small workloads (lightly loaded hosts, transforms that complete in -microseconds to low milliseconds), the preemption error is negligible. +microseconds to low nanoseconds), the preemption error is negligible. #### Why thread CPU time works for sync transforms @@ -224,13 +232,14 @@ interleave with unrelated futures on the same worker thread. #### Concurrent execution and multi-core accounting In the concurrent path (`run_concurrently`), each `tokio::spawn` task measures -its own CPU time independently. If 4 tasks each consume 0.25s of CPU in -parallel, the counter increments by 1.0s total — correctly reflecting that the -transform used 1.0 CPU-second even though only 0.25s of wall time elapsed. +its own CPU time independently and increments the shared counter handle (which +is `Clone` and backed by `AtomicU64::fetch_add`). If 4 tasks each consume 250ms +of CPU in parallel, the counter increments by 1000ns total — correctly +reflecting that the transform used 1 CPU-second even though only 250ms of wall +time elapsed. -This matches the semantics of `process_cpu_seconds_total` and Linux's -`/proc/[pid]/stat` CPU accounting: the counter can increase faster than -wall-clock time when work is parallelized. +The counter can therefore increase faster than wall-clock time (in ns), matching +the semantics of `process_cpu_seconds_total` for multi-core workloads. #### Integration into the Runner @@ -247,11 +256,9 @@ impl Runner { while let Some(events) = input_rx.next().await { self.on_events_received(&events); - let cpu_before = thread_cpu_time(); // NEW + let t0 = ThreadTime::now(); self.transform.transform_all(events, &mut outputs_buf); - let cpu_after = thread_cpu_time(); // NEW - let cpu_delta = cpu_after.saturating_sub(cpu_before); - self.cpu_seconds.increment(cpu_delta.as_secs_f64()); + self.cpu_ns.increment(t0.elapsed().as_nanos() as u64); // NEW self.send_outputs(&mut outputs_buf).await .map_err(TaskError::wrapped)?; @@ -267,15 +274,13 @@ impl Runner { // ... (existing event counting) ... let mut t = self.transform.clone(); let mut outputs_buf = self.outputs.new_buf_with_capacity(len); - let cpu_counter = self.cpu_seconds.clone(); // NEW + let cpu_ns = self.cpu_ns.clone(); // NEW let task = tokio::spawn(async move { - let cpu_before = thread_cpu_time(); // NEW + let t0 = ThreadTime::now(); for events in input_arrays { t.transform_all(events, &mut outputs_buf); } - let cpu_after = thread_cpu_time(); // NEW - let cpu_delta = cpu_after.saturating_sub(cpu_before); - cpu_counter.increment(cpu_delta.as_secs_f64()); + cpu_ns.increment(t0.elapsed().as_nanos() as u64); // NEW outputs_buf }.in_current_span()); in_flight.push_back(task); @@ -307,12 +312,13 @@ independently. - **Direct CPU cost visibility.** Operators can identify which transforms are CPU-bottlenecked vs. backpressure-limited, enabling informed tuning. -- **Composable with existing metrics.** `rate(cpu_seconds[1m])` gives CPU - cores used; dividing by `utilization` separates CPU from pipeline effects. +- **Composable with existing metrics.** `rate(component_cpu_usage_ns_total[1m]) / 1e9` + gives CPU cores used; dividing by `utilization` separates CPU from pipeline effects. - **Low overhead.** Two `clock_gettime` calls per batch (~80ns total on Linux) is negligible relative to the work `transform_all` performs. -- **Familiar semantics.** Follows the `cpu_seconds_total` convention from - Prometheus, Linux `/proc`, and cAdvisor — operators already know how to use it. +- **No accumulation errors.** The counter stores `u64` nanoseconds; each + increment is exact integer arithmetic. The single `u64 → f64` cast at scrape + time has bounded, non-accumulated error. ## Drawbacks @@ -337,7 +343,7 @@ Emit a histogram of per-event processing time instead of a cumulative counter. **Rejected because:** Histograms are expensive at high throughput (Vector processes millions of events/sec). A counter that increments once per batch is far cheaper. Per-event latency can be derived from the counter and -`events_sent_total` if needed (`cpu_seconds / events = avg cpu per event`). +`events_sent_total` if needed (`cpu_ns / events = avg cpu ns per event`). ### `getrusage(RUSAGE_THREAD)` instead of `clock_gettime` @@ -350,12 +356,7 @@ on modern kernels. The higher precision is worth the identical cost. ## Outstanding Questions -1. **Metric name:** Should it be `component_cpu_usage_seconds_total` (following - Prometheus conventions and the `component_` prefix used by existing Vector - metrics) or `transform_cpu_seconds_total` (since only transforms emit it - initially)? The `component_` prefix is preferred for consistency and to allow - future extension to sources and sinks. -2. **User/system split:** Should we report user and system CPU time separately +1. **User/system split:** Should we report user and system CPU time separately (as `mode="user"` / `mode="system"` tags) like `host_cpu_seconds_total` does? The Linux API supports this. It adds cardinality but helps distinguish transforms that trigger syscalls (e.g., enrichment table lookups) from pure @@ -366,20 +367,20 @@ on modern kernels. The higher precision is worth the identical cost. - Add `src/cpu_time.rs` module with `thread_cpu_time()` and platform-specific implementations behind `#[cfg]` gates. Include unit tests that verify the returned duration is non-zero and monotonically increasing. -- Register `counter!("component_cpu_usage_seconds_total")` in `Runner::new` and +- Register `counter!("component_cpu_usage_ns_total")` in `Runner::new` and instrument `run_inline` with wall-clock timing (Tier 1). - Instrument `run_concurrently` with wall-clock timing (Tier 1). Verify the counter increments correctly when multiple tasks run in parallel. - Switch from `Instant::now()` to `thread_cpu_time()` (Tier 2). Benchmark the overhead on Linux to confirm it is <100ns per call. - Add integration test: run a CPU-intensive remap transform, verify - `component_cpu_usage_seconds_total` is within 10% of expected CPU time. + `component_cpu_usage_ns_total` is within 10% of expected CPU time. - Add documentation for the new metric in the generated component docs. - Add changelog fragment. ## Future Improvements -- Extend `component_cpu_usage_seconds_total` to **task transforms** by measuring CPU +- Extend `component_cpu_usage_ns_total` to **task transforms** by measuring CPU time per `poll` of the transform stream. This requires careful accounting to exclude time spent in the tokio runtime between polls. - Extend to **sources and sinks** where the component owns a synchronous diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 60bdf19ec76f0..1369ce72382d5 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -8,7 +8,7 @@ use std::{ use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered}; use futures_util::stream::FuturesUnordered; -use metrics::{Gauge, gauge}; +use metrics::{Counter, counter, gauge}; use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire}; use tokio::{ select, @@ -1131,7 +1131,7 @@ struct Runner { timer_tx: UtilizationComponentSender, latency_recorder: LatencyRecorder, events_received: Registered, - cpu_seconds: Gauge, + cpu_ns: Counter, } impl Runner { @@ -1151,7 +1151,7 @@ impl Runner { timer_tx, latency_recorder, events_received: register!(EventsReceived), - cpu_seconds: gauge!("component_cpu_usage_seconds_total"), + cpu_ns: counter!("component_cpu_usage_ms_total"), } } @@ -1189,7 +1189,7 @@ impl Runner { self.on_events_received(&events); let t0 = ThreadTime::now(); self.transform.transform_all(events, &mut outputs_buf); - self.cpu_seconds.increment(t0.elapsed().as_secs_f64()); + self.cpu_ns.increment(t0.elapsed().as_nanos() as u64); self.send_outputs(&mut outputs_buf) .await .map_err(TaskError::wrapped)?; @@ -1238,13 +1238,13 @@ impl Runner { let mut t = self.transform.clone(); let mut outputs_buf = self.outputs.new_buf_with_capacity(len); - let cpu_seconds = self.cpu_seconds.clone(); + let cpu_ns = self.cpu_ns.clone(); let task = tokio::spawn(async move { let t0 = ThreadTime::now(); for events in input_arrays { t.transform_all(events, &mut outputs_buf); } - cpu_seconds.increment(t0.elapsed().as_secs_f64()); + cpu_ns.increment(t0.elapsed().as_nanos() as u64); outputs_buf }.in_current_span()); in_flight.push_back(task); From 4e5b7bfe637f7599a1342bd5025782131b8aa134 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Thu, 16 Apr 2026 11:40:33 +0200 Subject: [PATCH 09/19] update changelog --- changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md b/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md index 45d9ed496af56..7e0c7edcf333b 100644 --- a/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md +++ b/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md @@ -1,4 +1,5 @@ -Added a new counter metric `component_cpu_usage_seconds_total` counting the CPU -time consumed by a transform. Only supported for sync and function transforms. +Added a new counter metric `component_cpu_usage_ns_total` counting the CPU +time consumed by a transform in nanoseconds. Only supported for sync and +function transforms. authors: gwenaskell From d39d8e814cf71223ea091d2b9157a9fd11c20534 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Thu, 16 Apr 2026 11:41:23 +0200 Subject: [PATCH 10/19] update docs --- website/cue/reference/components/sources/internal_metrics.cue | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index 21fd2b3d840ae..c06e3089ee264 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -359,8 +359,8 @@ components: sources: internal_metrics: { default_namespace: "vector" tags: _component_tags } - component_cpu_usage_seconds_total: { - description: "The CPU time consumed by a component. Available for sync and function transforms." + component_cpu_usage_ns_total: { + description: "The CPU time consumed by a component in nanoseconds. Available for sync and function transforms." type: "counter" default_namespace: "vector" tags: _component_tags From 9957e81006fba5340008c7ebea351d603dc06dd3 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Thu, 16 Apr 2026 11:44:50 +0200 Subject: [PATCH 11/19] update and rename rfc --- ...econds-metric.md => 2026-04-13-component-cpu-metric.md} | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) rename rfcs/{2026-04-13-component-cpu-seconds-metric.md => 2026-04-13-component-cpu-metric.md} (98%) diff --git a/rfcs/2026-04-13-component-cpu-seconds-metric.md b/rfcs/2026-04-13-component-cpu-metric.md similarity index 98% rename from rfcs/2026-04-13-component-cpu-seconds-metric.md rename to rfcs/2026-04-13-component-cpu-metric.md index cbc318b6caf46..d06030fcdac34 100644 --- a/rfcs/2026-04-13-component-cpu-seconds-metric.md +++ b/rfcs/2026-04-13-component-cpu-metric.md @@ -121,6 +121,10 @@ crate stores the counter as `AtomicU64` and casts to `f64` only once at scrape time, bounding the conversion error to a single rounding rather than an accumulated sum of many small imprecise additions. +This choice was necessary because the `tracing` API only supports integer +increments of counters, unlike `host_cpu_seconds_total` which is directly +submitted as a floating number. + #### Supported OS: Thread CPU time (precise, Linux and macOS) For precise measurement, we read the calling thread's CPU clock before and after @@ -238,9 +242,6 @@ of CPU in parallel, the counter increments by 1000ns total — correctly reflecting that the transform used 1 CPU-second even though only 250ms of wall time elapsed. -The counter can therefore increase faster than wall-clock time (in ns), matching -the semantics of `process_cpu_seconds_total` for multi-core workloads. - #### Integration into the Runner ```rust From 5a4c3f5c22badc38b7c153213080f4b77915498b Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Thu, 16 Apr 2026 17:14:03 +0200 Subject: [PATCH 12/19] fix typo --- src/topology/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 1369ce72382d5..0e9d818734c8b 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -1151,7 +1151,7 @@ impl Runner { timer_tx, latency_recorder, events_received: register!(EventsReceived), - cpu_ns: counter!("component_cpu_usage_ms_total"), + cpu_ns: counter!("component_cpu_usage_ns_total"), } } From 70a8aa95aa7e977961c815cffbbf6c873fe9e92a Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Mon, 20 Apr 2026 09:23:55 +0200 Subject: [PATCH 13/19] remove implementation from rfc --- rfcs/2026-04-13-component-cpu-metric.md | 215 ------------------------ 1 file changed, 215 deletions(-) diff --git a/rfcs/2026-04-13-component-cpu-metric.md b/rfcs/2026-04-13-component-cpu-metric.md index d06030fcdac34..794cb52eb17f3 100644 --- a/rfcs/2026-04-13-component-cpu-metric.md +++ b/rfcs/2026-04-13-component-cpu-metric.md @@ -94,221 +94,6 @@ rate(component_cpu_usage_ns_total{component_id="my_remap"}[1m]) / 1e9 This metric is always emitted for sync/function transforms; there is no configuration knob. -### Implementation - -#### Metric definition - -Register a `counter!("component_cpu_usage_ns_total")` in the `Runner` struct, -alongside the existing `timer_tx` and `events_received` fields: - -```rust -struct Runner { - transform: Box, - input_rx: Option>, - input_type: DataType, - outputs: TransformOutputs, - timer_tx: UtilizationComponentSender, - latency_recorder: LatencyRecorder, - events_received: Registered, - cpu_ns: Counter, // NEW -} -``` - -Using nanoseconds as the counter unit fits naturally in the `metrics` crate's -`u64`-based `Counter`: each call to `counter.increment(delta.as_nanos() as u64)` -is exact integer arithmetic with no floating-point accumulation. The `metrics` -crate stores the counter as `AtomicU64` and casts to `f64` only once at scrape -time, bounding the conversion error to a single rounding rather than an -accumulated sum of many small imprecise additions. - -This choice was necessary because the `tracing` API only supports integer -increments of counters, unlike `host_cpu_seconds_total` which is directly -submitted as a floating number. - -#### Supported OS: Thread CPU time (precise, Linux and macOS) - -For precise measurement, we read the calling thread's CPU clock before and after -`transform_all`. This counts only time the thread was actually scheduled on a -CPU, excluding preemption, involuntary context switches, and any time another -process used the core. - -**Linux and macOS — `clock_gettime(CLOCK_THREAD_CPUTIME_ID)`** - -```rust -#[cfg(any(target_os = "linux", target_os = "macos"))] -fn thread_cpu_time() -> Duration { - let mut ts = libc::timespec { tv_sec: 0, tv_nsec: 0 }; - // SAFETY: ts is a valid pointer to a timespec struct and - // CLOCK_THREAD_CPUTIME_ID is a valid clock id on Linux >= 2.6 - // and macOS >= 10.12. - unsafe { - libc::clock_gettime(libc::CLOCK_THREAD_CPUTIME_ID, &mut ts); - } - Duration::new(ts.tv_sec as u64, ts.tv_nsec as u32) -} -``` - -This API is available on both Linux and macOS (since 10.12 Sierra, 2016). It -measures CPU time for the **calling thread** with nanosecond granularity. -Since `transform_all` is synchronous and runs entirely within a single thread -poll, the delta between two calls around `transform_all` gives exact CPU time -consumed by that transform invocation. - -**Overhead:** On Linux, `clock_gettime(CLOCK_THREAD_CPUTIME_ID)` is -vDSO-accelerated and costs ~20-60ns per call. On macOS the kernel path is -slightly heavier (~100-200ns) but still negligible compared to actual transform -work. With two calls per `transform_all` invocation the total overhead is -well under 500ns per batch on either platform. - -**Windows — `GetThreadTimes`** - -Windows exposes per-thread CPU time via `GetThreadTimes`, providing the same -guarantee as `CLOCK_THREAD_CPUTIME_ID` with 100ns granularity. It is -implemented using the `windows-sys` crate (added as a -`[target.'cfg(windows)'.dependencies]`), which is already a transitive -dependency of Vector. - -```rust -#[cfg(target_os = "windows")] -fn thread_cpu_time() -> Duration { - use windows_sys::Win32::Foundation::FILETIME; - use windows_sys::Win32::System::Threading::{GetCurrentThread, GetThreadTimes}; - - let mut creation = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; - let mut exit = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; - let mut kernel = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; - let mut user = FILETIME { dwLowDateTime: 0, dwHighDateTime: 0 }; - - // SAFETY: GetCurrentThread() returns a pseudo-handle that is always valid. - unsafe { - GetThreadTimes(GetCurrentThread(), &mut creation, &mut exit, &mut kernel, &mut user); - } - - // FILETIME stores time as 100ns intervals split across two u32 fields. - let to_nanos = |ft: FILETIME| { - (((ft.dwHighDateTime as u64) << 32) | ft.dwLowDateTime as u64) * 100 - }; - Duration::from_nanos(to_nanos(kernel) + to_nanos(user)) -} -``` - -#### Fallback: Wall-clock timing (other platforms) - -On any platform not covered above, fall back to wall-clock time: - -```rust -fn thread_cpu_time() -> Duration { - Instant::now() -} -``` - -This is simple and portable. Its accuracy is good for CPU-bound sync transforms -because `transform_all` is a synchronous call that does not yield to the tokio -runtime. The main source of inaccuracy is OS-level thread preemption: if the OS -schedules another process onto the core during `transform_all`, that wall-clock -time is counted as CPU time even though Vector was not executing. - -For small workloads (lightly loaded hosts, transforms that complete in -microseconds to low nanoseconds), the preemption error is negligible. - -#### Why thread CPU time works for sync transforms - -The critical property that makes this approach accurate is that `transform_all` -is a **synchronous, non-yielding** call. When the tokio runtime polls the future -containing `transform_all`: - -1. The runtime's worker thread enters the poll. -2. `transform_all` executes to completion without any `.await` points. -3. Control returns to the runtime. - -Between steps 1 and 3, the worker thread is exclusively executing transform -code. Reading the thread CPU clock before and after captures exactly the CPU -time consumed, regardless of: - -- Other tokio tasks that may be queued (they can't preempt a synchronous call). -- OS preemption (thread CPU clock excludes time spent not running). -- Other concurrent `tokio::spawn` tasks on different threads (each measures its - own thread independently). - -This would **not** work for async task transforms, where a single `poll` may -interleave with unrelated futures on the same worker thread. - -#### Concurrent execution and multi-core accounting - -In the concurrent path (`run_concurrently`), each `tokio::spawn` task measures -its own CPU time independently and increments the shared counter handle (which -is `Clone` and backed by `AtomicU64::fetch_add`). If 4 tasks each consume 250ms -of CPU in parallel, the counter increments by 1000ns total — correctly -reflecting that the transform used 1 CPU-second even though only 250ms of wall -time elapsed. - -#### Integration into the Runner - -```rust -impl Runner { - async fn run_inline(mut self) -> TaskResult { - const INLINE_BATCH_SIZE: usize = 128; - let mut outputs_buf = self.outputs.new_buf_with_capacity(INLINE_BATCH_SIZE); - let mut input_rx = self.input_rx.take().expect("can't run runner twice") - .into_stream() - .filter(move |events| ready(filter_events_type(events, self.input_type))); - - self.timer_tx.try_send_start_wait(); - while let Some(events) = input_rx.next().await { - self.on_events_received(&events); - - let t0 = ThreadTime::now(); - self.transform.transform_all(events, &mut outputs_buf); - self.cpu_ns.increment(t0.elapsed().as_nanos() as u64); // NEW - - self.send_outputs(&mut outputs_buf).await - .map_err(TaskError::wrapped)?; - } - Ok(TaskOutput::Transform) - } - - async fn run_concurrently(mut self) -> TaskResult { - // ... (existing setup) ... - input_arrays = input_rx.next(), ... => { - match input_arrays { - Some(input_arrays) => { - // ... (existing event counting) ... - let mut t = self.transform.clone(); - let mut outputs_buf = self.outputs.new_buf_with_capacity(len); - let cpu_ns = self.cpu_ns.clone(); // NEW - let task = tokio::spawn(async move { - let t0 = ThreadTime::now(); - for events in input_arrays { - t.transform_all(events, &mut outputs_buf); - } - cpu_ns.increment(t0.elapsed().as_nanos() as u64); // NEW - outputs_buf - }.in_current_span()); - in_flight.push_back(task); - } - // ... - } - } - // ... - } -} -``` - -#### Module structure - -Add a new module `src/cpu_time.rs`: - -```rust -/// Returns the CPU time consumed by the calling thread. -/// -/// On Linux and macOS, uses clock_gettime(CLOCK_THREAD_CPUTIME_ID) (nanosecond precision). -/// On other platforms, falls back to Instant::now() (wall-clock time). -pub(crate) fn thread_cpu_time() -> Duration { ... } -``` - -This keeps the platform-specific FFI contained in one file and testable -independently. - ## Rationale - **Direct CPU cost visibility.** Operators can identify which transforms are From 137a7d47dcd58ba818d689f7d68347a6892e4c96 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Tue, 28 Apr 2026 17:16:30 +0200 Subject: [PATCH 14/19] refactor(metrics): hook component CPU measurement onto task poll boundary For the concurrent transform path, replace the inline ThreadTime brackets inside the spawned async block with a CpuTimedFuture adapter that samples thread CPU time around every Future::poll and accumulates the delta into the component_cpu_usage_ns_total counter. Within a single poll tokio cannot migrate the task or run another task on this thread, so each pair is a clean per-thread CPU measurement; multi-poll futures accumulate correctly, which keeps the wrapper applicable if the body ever grows .await points or to future task-transform coverage. The inline path is unchanged: its body is sync and runs in the transform's own task, so direct measurement is the simplest correct option. The RFC is updated to describe the wrapper approach in Rationale, Plan Of Attack, and Future Improvements. Co-Authored-By: Claude Opus 4.7 (1M context) --- rfcs/2026-04-13-component-cpu-metric.md | 58 +++++++++++++++++-------- src/cpu_time.rs | 51 +++++++++++++++++++++- src/topology/builder.rs | 29 ++++++++----- 3 files changed, 108 insertions(+), 30 deletions(-) diff --git a/rfcs/2026-04-13-component-cpu-metric.md b/rfcs/2026-04-13-component-cpu-metric.md index 794cb52eb17f3..3695156981846 100644 --- a/rfcs/2026-04-13-component-cpu-metric.md +++ b/rfcs/2026-04-13-component-cpu-metric.md @@ -38,11 +38,11 @@ CPU clocks. ### Out of scope -- Task transforms (async stream-based). Their execution interleaves with the - tokio runtime in ways that make per-poll CPU measurement a distinct problem. - Furthermore, all task transforms in Vector are currently single-threaded (they - do not parallelize work), making the `utilization` metric a good indicator of - their actual usage. +- Task transforms (async stream-based). The poll-hook wrapper described below + could be applied to them, but they are currently single-threaded (they do not + parallelize work), so the `utilization` gauge is already a good indicator of + their actual usage. Extending coverage to task transforms is left as a + follow-up. - Sources and sinks. - Replacing or modifying the existing `utilization` gauge. @@ -100,7 +100,17 @@ configuration knob. CPU-bottlenecked vs. backpressure-limited, enabling informed tuning. - **Composable with existing metrics.** `rate(component_cpu_usage_ns_total[1m]) / 1e9` gives CPU cores used; dividing by `utilization` separates CPU from pipeline effects. -- **Low overhead.** Two `clock_gettime` calls per batch (~80ns total on Linux) +- **Measurement is hooked at the task's poll boundary.** For the concurrent + path, the spawned tokio task's future is wrapped in an adapter that samples + thread CPU time around every call to `Future::poll`. Tokio's cooperative + scheduler guarantees that within a single poll the task cannot be moved to + another worker thread and no other task can run on the current thread, so + each `(before_poll, after_poll)` pair is a clean per-thread measurement. + Multiple polls (across `Pending` returns and wake-ups) accumulate correctly, + with each poll independently sampling the thread it ran on. This isolates + the timing concern from the transform body and keeps it robust if the body + ever grows `.await` points. +- **Low overhead.** Two `clock_gettime` calls per poll (~80ns total on Linux) is negligible relative to the work `transform_all` performs. - **No accumulation errors.** The counter stores `u64` nanoseconds; each increment is exact integer arithmetic. The single `u64 → f64` cast at scrape @@ -150,15 +160,23 @@ on modern kernels. The higher precision is worth the identical cost. ## Plan Of Attack -- Add `src/cpu_time.rs` module with `thread_cpu_time()` and platform-specific - implementations behind `#[cfg]` gates. Include unit tests that verify the - returned duration is non-zero and monotonically increasing. -- Register `counter!("component_cpu_usage_ns_total")` in `Runner::new` and - instrument `run_inline` with wall-clock timing (Tier 1). -- Instrument `run_concurrently` with wall-clock timing (Tier 1). Verify the - counter increments correctly when multiple tasks run in parallel. -- Switch from `Instant::now()` to `thread_cpu_time()` (Tier 2). Benchmark - the overhead on Linux to confirm it is <100ns per call. +- Add `src/cpu_time.rs` module exposing: + - A `ThreadTime` snapshot with platform-specific implementations behind + `#[cfg]` gates (Linux/macOS `CLOCK_THREAD_CPUTIME_ID`, Windows + `GetThreadTimes`, wall-clock fallback elsewhere). Include unit tests that + verify the returned duration is non-negative and monotone. + - A `CpuTimedFuture` adapter that wraps a future and, on every + `Future::poll`, samples `ThreadTime` before and after the inner poll and + increments a `metrics::Counter` by the delta. +- Register `counter!("component_cpu_usage_ns_total")` in `Runner::new`. +- For `run_inline`, bracket the synchronous `transform_all` call directly with + `ThreadTime::now()` / `elapsed()`. The transform task itself owns this code + and there is no `.await` between the brackets, so inline measurement is the + simplest correct option. +- For `run_concurrently`, wrap the spawned per-batch future in + `CpuTimedFuture::new(_, cpu_ns.clone())` rather than measuring inline. This + hooks the measurement onto the task's `Future::poll` boundary and makes the + pattern uniform for any future async work added inside the spawned body. - Add integration test: run a CPU-intensive remap transform, verify `component_cpu_usage_ns_total` is within 10% of expected CPU time. - Add documentation for the new metric in the generated component docs. @@ -166,12 +184,14 @@ on modern kernels. The higher precision is worth the identical cost. ## Future Improvements -- Extend `component_cpu_usage_ns_total` to **task transforms** by measuring CPU - time per `poll` of the transform stream. This requires careful accounting to - exclude time spent in the tokio runtime between polls. +- Extend `component_cpu_usage_ns_total` to **task transforms** by wrapping the + task transform's stream future in `CpuTimedFuture`. Time spent in the tokio + runtime between polls is naturally excluded: thread CPU time only ticks + while the thread is running poll, and the wrapper only samples around poll + calls. - Extend to **sources and sinks** where the component owns a synchronous processing step (e.g., codec encoding in sinks). -- Expose a derived `**cpu_utilization` gauge\*\* (CPU seconds / wall seconds) +- Expose a derived **`cpu_utilization` gauge** (CPU seconds / wall seconds) computed by the `UtilizationEmitter` for operators who prefer a ready-to-use ratio. - Add `mode="user"` / `mode="system"` tag split for deeper CPU profiling. diff --git a/src/cpu_time.rs b/src/cpu_time.rs index 7dc455e5939d2..59864e76b9175 100644 --- a/src/cpu_time.rs +++ b/src/cpu_time.rs @@ -1,4 +1,12 @@ -use std::time::Duration; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use metrics::Counter; +use pin_project::pin_project; /// An opaque snapshot of thread CPU time. /// @@ -158,6 +166,47 @@ impl Inner { } } +// ── CpuTimedFuture: per-poll CPU time accumulator ───────────────────────── + +/// A [`Future`] adapter that accumulates thread CPU time across every `poll`. +/// +/// Each call to [`Future::poll`] is bracketed by a [`ThreadTime`] sample; +/// the delta is added to `counter`. Tokio's executor cannot migrate a task +/// to another worker thread or run another task on the current thread between +/// the entry and exit of a single `poll`, so each delta is a clean per-thread +/// CPU-time measurement of the wrapped future's work for that poll. Multiple +/// polls (across `Pending` returns and wake-ups) accumulate into the same +/// counter, with each poll independently sampling the thread it ran on. +/// +/// This is the per-task analogue of tokio's unstable +/// `on_before_task_poll` / `on_after_task_poll` runtime hooks: it hooks the +/// same boundary, but on a single future rather than the whole runtime, and +/// it works on stable Rust without `--cfg tokio_unstable`. +#[pin_project] +pub(crate) struct CpuTimedFuture { + #[pin] + inner: F, + counter: Counter, +} + +impl CpuTimedFuture { + pub(crate) const fn new(inner: F, counter: Counter) -> Self { + Self { inner, counter } + } +} + +impl Future for CpuTimedFuture { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let t0 = ThreadTime::now(); + let result = this.inner.poll(cx); + this.counter.increment(t0.elapsed().as_nanos() as u64); + result + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 0e9d818734c8b..d0586b0100346 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -45,7 +45,7 @@ use crate::{ ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId, ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput, }, - cpu_time::ThreadTime, + cpu_time::{CpuTimedFuture, ThreadTime}, event::{EventArray, EventContainer}, extra_context::ExtraContext, internal_events::EventsReceived, @@ -1238,15 +1238,24 @@ impl Runner { let mut t = self.transform.clone(); let mut outputs_buf = self.outputs.new_buf_with_capacity(len); - let cpu_ns = self.cpu_ns.clone(); - let task = tokio::spawn(async move { - let t0 = ThreadTime::now(); - for events in input_arrays { - t.transform_all(events, &mut outputs_buf); - } - cpu_ns.increment(t0.elapsed().as_nanos() as u64); - outputs_buf - }.in_current_span()); + // Hook CPU-time accounting onto the spawned task at + // the `Future::poll` boundary. The transform body + // contains no `.await`, so a single poll runs to + // completion on one worker thread; if a future + // refactor adds awaits, accumulation across polls + // remains correct. + let task = tokio::spawn( + CpuTimedFuture::new( + async move { + for events in input_arrays { + t.transform_all(events, &mut outputs_buf); + } + outputs_buf + }, + self.cpu_ns.clone(), + ) + .in_current_span(), + ); in_flight.push_back(task); } None => { From be305c0946cb5f328cf273cd80eb501559d82b19 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Tue, 28 Apr 2026 17:22:27 +0200 Subject: [PATCH 15/19] refactor(metrics): expose CpuTimedFuture via a CpuTimedExt extension trait Replace the explicit CpuTimedFuture::new constructor with a CpuTimedExt trait so the wrapper composes naturally with .in_current_span() and similar future-extension methods: async move { ... } .cpu_timed(cpu_ns.clone()) .in_current_span() Mirrors the style of tracing::Instrument::in_current_span. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- rfcs/2026-04-13-component-cpu-metric.md | 12 ++++++----- src/cpu_time.rs | 27 +++++++++++++++++++------ src/topology/builder.rs | 18 ++++++++--------- 3 files changed, 36 insertions(+), 21 deletions(-) diff --git a/rfcs/2026-04-13-component-cpu-metric.md b/rfcs/2026-04-13-component-cpu-metric.md index 3695156981846..3d607e3b1a238 100644 --- a/rfcs/2026-04-13-component-cpu-metric.md +++ b/rfcs/2026-04-13-component-cpu-metric.md @@ -167,16 +167,18 @@ on modern kernels. The higher precision is worth the identical cost. verify the returned duration is non-negative and monotone. - A `CpuTimedFuture` adapter that wraps a future and, on every `Future::poll`, samples `ThreadTime` before and after the inner poll and - increments a `metrics::Counter` by the delta. + increments a `metrics::Counter` by the delta. A `CpuTimedExt` extension + trait exposes it as a chained `.cpu_timed(counter)` call, mirroring + `tracing::Instrument::in_current_span`. - Register `counter!("component_cpu_usage_ns_total")` in `Runner::new`. - For `run_inline`, bracket the synchronous `transform_all` call directly with `ThreadTime::now()` / `elapsed()`. The transform task itself owns this code and there is no `.await` between the brackets, so inline measurement is the simplest correct option. -- For `run_concurrently`, wrap the spawned per-batch future in - `CpuTimedFuture::new(_, cpu_ns.clone())` rather than measuring inline. This - hooks the measurement onto the task's `Future::poll` boundary and makes the - pattern uniform for any future async work added inside the spawned body. +- For `run_concurrently`, wrap the spawned per-batch future via + `.cpu_timed(cpu_ns.clone())` rather than measuring inline. This hooks the + measurement onto the task's `Future::poll` boundary and makes the pattern + uniform for any future async work added inside the spawned body. - Add integration test: run a CPU-intensive remap transform, verify `component_cpu_usage_ns_total` is within 10% of expected CPU time. - Add documentation for the new metric in the generated component docs. diff --git a/src/cpu_time.rs b/src/cpu_time.rs index 59864e76b9175..24068be8553b4 100644 --- a/src/cpu_time.rs +++ b/src/cpu_time.rs @@ -182,6 +182,8 @@ impl Inner { /// `on_before_task_poll` / `on_after_task_poll` runtime hooks: it hooks the /// same boundary, but on a single future rather than the whole runtime, and /// it works on stable Rust without `--cfg tokio_unstable`. +/// +/// Construct it via [`CpuTimedExt::cpu_timed`]. #[pin_project] pub(crate) struct CpuTimedFuture { #[pin] @@ -189,12 +191,6 @@ pub(crate) struct CpuTimedFuture { counter: Counter, } -impl CpuTimedFuture { - pub(crate) const fn new(inner: F, counter: Counter) -> Self { - Self { inner, counter } - } -} - impl Future for CpuTimedFuture { type Output = F::Output; @@ -207,6 +203,25 @@ impl Future for CpuTimedFuture { } } +/// Extension trait that wraps a future in [`CpuTimedFuture`] via a chained +/// call: +/// +/// ```ignore +/// async move { /* work */ }.cpu_timed(counter) +/// ``` +/// +/// Mirrors the style of [`tracing::Instrument::in_current_span`]. +pub(crate) trait CpuTimedExt: Future + Sized { + fn cpu_timed(self, counter: Counter) -> CpuTimedFuture { + CpuTimedFuture { + inner: self, + counter, + } + } +} + +impl CpuTimedExt for F {} + #[cfg(test)] mod tests { use super::*; diff --git a/src/topology/builder.rs b/src/topology/builder.rs index d0586b0100346..3e1ffa88a2870 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -45,7 +45,7 @@ use crate::{ ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId, ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput, }, - cpu_time::{CpuTimedFuture, ThreadTime}, + cpu_time::{CpuTimedExt, ThreadTime}, event::{EventArray, EventContainer}, extra_context::ExtraContext, internal_events::EventsReceived, @@ -1245,15 +1245,13 @@ impl Runner { // refactor adds awaits, accumulation across polls // remains correct. let task = tokio::spawn( - CpuTimedFuture::new( - async move { - for events in input_arrays { - t.transform_all(events, &mut outputs_buf); - } - outputs_buf - }, - self.cpu_ns.clone(), - ) + async move { + for events in input_arrays { + t.transform_all(events, &mut outputs_buf); + } + outputs_buf + } + .cpu_timed(self.cpu_ns.clone()) .in_current_span(), ); in_flight.push_back(task); From 05fc4f627b94ffb48ef3a50a2c8dae2bc9aa0200 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Tue, 28 Apr 2026 18:18:58 +0200 Subject: [PATCH 16/19] feat(metrics): emit component_cpu_usage_ns_total for all transforms MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a `cpu_ns: Counter` field to `TransformContext`, defaulting to `Counter::noop()`. The topology builder resolves the counter once, inside the transform `error_span!` so it is tagged with the right component_id / component_kind / component_type, and stores it on the context. This is the single Counter handle every transform path consumes — sync, task, and any helper tokio tasks — so label resolution and recorder lookup are paid once at construction time rather than on every poll. For task transforms (`build_task_transform`), wrap the outer task future with `.cpu_timed(counter)` before `.boxed()`. CPU time is accumulated across every poll of the task; multi-poll futures accumulate correctly, and time the task spends parked in `Pending` is naturally excluded. For transforms that spawn long-running helper tokio tasks at construction time, plumb the counter through and `.cpu_timed(...)` those spawns too: - `aws_ec2_metadata`: the periodic IMDS-refresh worker. - `throttle`'s `RateLimiterRunner`: the periodic `retain_recent` flush loop. The counter is plumbed through `RateLimiterRunner::start` as a parameter. Without this, those helpers' CPU would be silently excluded. The bracket scope for task transforms is slightly wider than for sync transforms — it includes input-channel polls, the Utilization / OutputUtilization wrappers, and the fanout-send loop — but channel / fanout overhead is small relative to transform work, so the metric remains comparable across kinds. RFC and changelog updated to reflect the broader coverage. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...12-add-per-component-cpu-metric.feature.md | 4 +- rfcs/2026-04-13-component-cpu-metric.md | 79 ++++++++++++------- src/config/transform.rs | 8 ++ src/topology/builder.rs | 15 +++- src/transforms/aws_ec2_metadata.rs | 6 ++ src/transforms/throttle/rate_limiter.rs | 27 +++++-- src/transforms/throttle/transform.rs | 10 ++- 7 files changed, 110 insertions(+), 39 deletions(-) diff --git a/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md b/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md index 7e0c7edcf333b..bb56ff649e215 100644 --- a/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md +++ b/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md @@ -1,5 +1,5 @@ Added a new counter metric `component_cpu_usage_ns_total` counting the CPU -time consumed by a transform in nanoseconds. Only supported for sync and -function transforms. +time consumed by a transform in nanoseconds. Supported for sync, function, +and task transforms. authors: gwenaskell diff --git a/rfcs/2026-04-13-component-cpu-metric.md b/rfcs/2026-04-13-component-cpu-metric.md index 3d607e3b1a238..c3bd3809745f7 100644 --- a/rfcs/2026-04-13-component-cpu-metric.md +++ b/rfcs/2026-04-13-component-cpu-metric.md @@ -1,4 +1,4 @@ -# RFC 2026-04-13 - Per-component CPU time metric for sync transforms +# RFC 2026-04-13 - Per-component CPU time metric for transforms The current `utilization` gauge measures the fraction of wall-clock time a component is not idle (i.e., not waiting on its input channel). Because sync @@ -15,7 +15,9 @@ CPU clocks. - The existing `utilization` metric is implemented in `src/utilization.rs`. - Sync and function transforms are spawned in `src/topology/builder.rs` via the `Runner` struct (`run_inline` and `run_concurrently` methods). -- The `enable_concurrency` trait method controls whether a transform is +- Task transforms are built in `src/topology/builder.rs::build_task_transform` + and run as a single async task driving a stream-based pipeline. +- The `enable_concurrency` trait method controls whether a sync transform is dispatched to parallel `tokio::spawn` tasks (up to `TRANSFORM_CONCURRENCY_LIMIT`, which defaults to the number of worker threads). @@ -24,25 +26,21 @@ CPU clocks. - The `utilization` gauge remains as-is. This RFC adds a complementary metric; it does not replace the existing one. -- Future work could extend this approach to task transforms and sinks. +- Future work could extend this approach to sources and sinks. ## Scope ### In scope -- A new `component_cpu_usage_ns_total` counter for **sync and function - transforms** (both inline and concurrent execution paths). +- A new `component_cpu_usage_ns_total` counter for **all transforms** — + sync and function transforms (both inline and concurrent execution paths) + and task transforms. - Two implementation tiers: a wall-clock fallback that works everywhere, and a precise thread-CPU-time implementation using OS APIs. - Feasibility analysis of thread-level CPU time measurement. ### Out of scope -- Task transforms (async stream-based). The poll-hook wrapper described below - could be applied to them, but they are currently single-threaded (they do not - parallelize work), so the `utilization` gauge is already a good indicator of - their actual usage. Extending coverage to task transforms is left as a - follow-up. - Sources and sinks. - Replacing or modifying the existing `utilization` gauge. @@ -72,7 +70,7 @@ CPU clocks. ### User Experience -A new counter metric is emitted for every sync/function transform: +A new counter metric is emitted for every transform: ```prometheus component_cpu_usage_ns_total{component_id="my_remap",component_kind="transform",component_type="remap"} 14207 @@ -91,8 +89,7 @@ rate(component_cpu_usage_ns_total{component_id="my_remap"}[1m]) / 1e9 utilization{component_id="my_remap"} ``` -This metric is always emitted for sync/function transforms; there is no -configuration knob. +This metric is always emitted for transforms; there is no configuration knob. ## Rationale @@ -101,15 +98,30 @@ configuration knob. - **Composable with existing metrics.** `rate(component_cpu_usage_ns_total[1m]) / 1e9` gives CPU cores used; dividing by `utilization` separates CPU from pipeline effects. - **Measurement is hooked at the task's poll boundary.** For the concurrent - path, the spawned tokio task's future is wrapped in an adapter that samples - thread CPU time around every call to `Future::poll`. Tokio's cooperative - scheduler guarantees that within a single poll the task cannot be moved to - another worker thread and no other task can run on the current thread, so - each `(before_poll, after_poll)` pair is a clean per-thread measurement. - Multiple polls (across `Pending` returns and wake-ups) accumulate correctly, - with each poll independently sampling the thread it ran on. This isolates - the timing concern from the transform body and keeps it robust if the body - ever grows `.await` points. + sync path and for task transforms, the spawned tokio task's future is wrapped + in an adapter that samples thread CPU time around every call to + `Future::poll`. Tokio's cooperative scheduler guarantees that within a single + poll the task cannot be moved to another worker thread and no other task can + run on the current thread, so each `(before_poll, after_poll)` pair is a + clean per-thread measurement. Multiple polls (across `Pending` returns and + wake-ups) accumulate correctly, with each poll independently sampling the + thread it ran on. This isolates the timing concern from the transform body + and keeps it robust if the body ever grows `.await` points (which task + transforms have many of, by construction). +- **Scope of the measurement.** For sync transforms (inline and concurrent), + the bracket covers exactly the synchronous transform call. For task + transforms, the bracket covers the entire task body, which additionally + includes input-channel polls, the `Utilization`/`OutputUtilization` stream + wrappers' overhead, and the fanout-send loop. In practice the channel/fanout + overhead is small relative to the transform's own work, so the metric remains + a meaningful comparator across transform kinds. +- **Helper tasks are attributed too.** A few transforms spawn long-running + helper tokio tasks at construction time — `aws_ec2_metadata` (periodic IMDS + refresh) and `throttle` via `RateLimiterRunner` (periodic rate-limit-key + flush). The `cpu_ns` counter is plumbed through `TransformContext` so those + helpers can wrap their spawn with the same `.cpu_timed(counter)` and have + their CPU attributed to the same component, rather than being silently + excluded. - **Low overhead.** Two `clock_gettime` calls per poll (~80ns total on Linux) is negligible relative to the work `transform_all` performs. - **No accumulation errors.** The counter stores `u64` nanoseconds; each @@ -170,7 +182,14 @@ on modern kernels. The higher precision is worth the identical cost. increments a `metrics::Counter` by the delta. A `CpuTimedExt` extension trait exposes it as a chained `.cpu_timed(counter)` call, mirroring `tracing::Instrument::in_current_span`. -- Register `counter!("component_cpu_usage_ns_total")` in `Runner::new`. +- Add a `cpu_ns: Counter` field to `TransformContext`, defaulting to + `Counter::noop()`. In `build_transforms`, register the counter inside the + transform `error_span!` so it is tagged with `component_id`, + `component_kind`, and `component_type`, then store it on the context. This + is the single resolved handle every transform path consumes — sync, task, + and any helper tokio tasks — so labels and recorder lookup are paid once, + not on every poll. Also propagate the same handle through `TransformNode` + for the topology builder's own use. - For `run_inline`, bracket the synchronous `transform_all` call directly with `ThreadTime::now()` / `elapsed()`. The transform task itself owns this code and there is no `.await` between the brackets, so inline measurement is the @@ -179,6 +198,15 @@ on modern kernels. The higher precision is worth the identical cost. `.cpu_timed(cpu_ns.clone())` rather than measuring inline. This hooks the measurement onto the task's `Future::poll` boundary and makes the pattern uniform for any future async work added inside the spawned body. +- For `build_task_transform`, take the counter from `TransformNode` (which + receives it from the context) and wrap the outer task future with + `.cpu_timed(counter)` before `.boxed()`. CPU time accumulates across every + poll of the task, naturally excluding time the task is parked in `Pending`. +- For transforms that spawn helper tokio tasks at construction time + (`aws_ec2_metadata`, `throttle`'s `RateLimiterRunner`), read + `context.cpu_ns.clone()` in `build()` and `.cpu_timed(...)` the spawned + helper future. For `RateLimiterRunner::start`, plumb the counter through as + a parameter so it stays the throttle config's responsibility to provide it. - Add integration test: run a CPU-intensive remap transform, verify `component_cpu_usage_ns_total` is within 10% of expected CPU time. - Add documentation for the new metric in the generated component docs. @@ -186,11 +214,6 @@ on modern kernels. The higher precision is worth the identical cost. ## Future Improvements -- Extend `component_cpu_usage_ns_total` to **task transforms** by wrapping the - task transform's stream future in `CpuTimedFuture`. Time spent in the tokio - runtime between polls is naturally excluded: thread CPU time only ticks - while the thread is running poll, and the wrapper only samples around poll - calls. - Extend to **sources and sinks** where the component owns a synchronous processing step (e.g., codec encoding in sinks). - Expose a derived **`cpu_utilization` gauge** (CPU seconds / wall seconds) diff --git a/src/config/transform.rs b/src/config/transform.rs index 7f5d009030921..586a4923ac91e 100644 --- a/src/config/transform.rs +++ b/src/config/transform.rs @@ -6,6 +6,7 @@ use std::{ use async_trait::async_trait; use dyn_clone::DynClone; +use metrics::Counter; use serde::Serialize; use vector_lib::{ config::{GlobalOptions, Input, LogNamespace, TransformOutput}, @@ -141,6 +142,12 @@ pub struct TransformContext { /// Extra context data provided by the running app and shared across all components. This can be /// used to pass shared settings or other data from outside the components. pub extra_context: ExtraContext, + + /// Counter handle for `component_cpu_usage_ns_total`, pre-tagged with this transform's + /// component identity. Transforms that spawn helper tokio tasks at construction time + /// (e.g. `aws_ec2_metadata`, `throttle`) clone this and `.cpu_timed(...)` those tasks so + /// their CPU is attributed to the component alongside the main transform task. + pub cpu_ns: Counter, } impl Default for TransformContext { @@ -154,6 +161,7 @@ impl Default for TransformContext { merged_schema_definition: schema::Definition::any(), schema: SchemaOptions::default(), extra_context: Default::default(), + cpu_ns: Counter::noop(), } } } diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 3e1ffa88a2870..fbf9cb01d5eee 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -482,6 +482,11 @@ impl<'a> Builder<'a> { }) .collect::>(); + // Resolve the per-component CPU counter inside the transform span so it + // picks up component_id/component_kind/component_type tags. The same + // handle is shared between the main transform task and any helper + // tokio tasks the transform spawns at construction time. + let cpu_ns = counter!("component_cpu_usage_ns_total"); let context = TransformContext { key: Some(key.clone()), globals: self.config.global.clone(), @@ -491,6 +496,7 @@ impl<'a> Builder<'a> { merged_schema_definition: merged_definition.clone(), schema: self.config.schema, extra_context: self.extra_context.clone(), + cpu_ns, }; let node = @@ -734,6 +740,7 @@ impl<'a> Builder<'a> { node.typetag, &node.key, &node.outputs, + node.cpu_ns.clone(), ), } } @@ -756,6 +763,7 @@ impl<'a> Builder<'a> { node.input_details.data_type(), outputs, LatencyRecorder::new(self.config.global.latency_ewma_alpha), + node.cpu_ns.clone(), ); let transform = if node.enable_concurrency { runner.run_concurrently().boxed() @@ -799,6 +807,7 @@ impl<'a> Builder<'a> { typetag: &str, key: &ComponentKey, outputs: &[TransformOutput], + cpu_ns: Counter, ) -> (Task, HashMap) { let (mut fanout, control) = Fanout::new(); @@ -866,6 +875,7 @@ impl<'a> Builder<'a> { } } } + .cpu_timed(cpu_ns) .boxed(); let mut outputs = HashMap::new(); @@ -1103,6 +1113,7 @@ struct TransformNode { input_details: Input, outputs: Vec, enable_concurrency: bool, + cpu_ns: Counter, } impl TransformNode { @@ -1119,6 +1130,7 @@ impl TransformNode { input_details: transform.inner.input(), outputs: transform.inner.outputs(context, schema_definition), enable_concurrency: transform.inner.enable_concurrency(), + cpu_ns: context.cpu_ns.clone(), } } } @@ -1142,6 +1154,7 @@ impl Runner { input_type: DataType, outputs: TransformOutputs, latency_recorder: LatencyRecorder, + cpu_ns: Counter, ) -> Self { Self { transform, @@ -1151,7 +1164,7 @@ impl Runner { timer_tx, latency_recorder, events_received: register!(EventsReceived), - cpu_ns: counter!("component_cpu_usage_ns_total"), + cpu_ns, } } diff --git a/src/transforms/aws_ec2_metadata.rs b/src/transforms/aws_ec2_metadata.rs index ed55d5aa2dfe8..e358d0cc5157a 100644 --- a/src/transforms/aws_ec2_metadata.rs +++ b/src/transforms/aws_ec2_metadata.rs @@ -30,6 +30,7 @@ use crate::{ config::{ DataType, Input, OutputId, ProxyConfig, TransformConfig, TransformContext, TransformOutput, }, + cpu_time::CpuTimedExt, event::Event, http::HttpClient, internal_events::{AwsEc2MetadataRefreshError, AwsEc2MetadataRefreshSuccessful}, @@ -237,10 +238,15 @@ impl TransformConfig for Ec2Metadata { } } + // The metadata-refresh loop runs as its own tokio task, so the main + // transform task's `.cpu_timed` wrapper does not see it. Wrap the + // background task with the same component-tagged counter so its CPU + // is attributed to this transform. tokio::spawn( async move { client.run().await; } + .cpu_timed(context.cpu_ns.clone()) // TODO: Once #1338 is done we can fetch the current span .instrument(info_span!("aws_ec2_metadata: worker").or_current()), ); diff --git a/src/transforms/throttle/rate_limiter.rs b/src/transforms/throttle/rate_limiter.rs index 83f1b8de5384b..76004f3c6fbb0 100644 --- a/src/transforms/throttle/rate_limiter.rs +++ b/src/transforms/throttle/rate_limiter.rs @@ -3,8 +3,11 @@ use std::{hash::Hash, sync::Arc, time::Duration}; use governor::{ Quota, RateLimiter, clock, middleware::NoOpMiddleware, state::keyed::DashMapStateStore, }; +use metrics::Counter; use tokio; +use crate::cpu_time::CpuTimedExt; + /// Re-usable wrapper around the structs/type from the governor crate. /// Spawns a background task that periodically flushes keys that haven't been accessed recently. pub struct RateLimiterRunner @@ -21,17 +24,27 @@ where K: Hash + Eq + Clone + Send + Sync + 'static, C: clock::Clock + Clone + Send + Sync + 'static, { - pub fn start(quota: Quota, clock: C, flush_keys_interval: Duration) -> Self { + pub fn start( + quota: Quota, + clock: C, + flush_keys_interval: Duration, + cpu_ns: Counter, + ) -> Self { let rate_limiter = Arc::new(RateLimiter::dashmap_with_clock(quota, clock)); let rate_limiter_clone = Arc::clone(&rate_limiter); - let flush_handle = tokio::spawn(async move { - let mut interval = tokio::time::interval(flush_keys_interval); - loop { - interval.tick().await; - rate_limiter_clone.retain_recent(); + // Hook the periodic key-flush task onto the component's CPU counter so + // its housekeeping work is attributed to this throttle transform. + let flush_handle = tokio::spawn( + async move { + let mut interval = tokio::time::interval(flush_keys_interval); + loop { + interval.tick().await; + rate_limiter_clone.retain_recent(); + } } - }); + .cpu_timed(cpu_ns), + ); Self { rate_limiter, diff --git a/src/transforms/throttle/transform.rs b/src/transforms/throttle/transform.rs index 11647c65b2f4c..471582b93e6bb 100644 --- a/src/transforms/throttle/transform.rs +++ b/src/transforms/throttle/transform.rs @@ -3,6 +3,7 @@ use std::{hash::Hash, num::NonZeroU32, pin::Pin, time::Duration}; use async_stream::stream; use futures::{Stream, StreamExt}; use governor::{Quota, clock}; +use metrics::Counter; use snafu::Snafu; use super::{ @@ -26,6 +27,7 @@ pub struct Throttle, I: clock::Reference> { exclude: Option, pub clock: C, internal_metrics: ThrottleInternalMetricsConfig, + cpu_ns: Counter, } impl Throttle @@ -64,6 +66,7 @@ where key_field: config.key_field.clone(), exclude, internal_metrics: config.internal_metrics.clone(), + cpu_ns: context.cpu_ns.clone(), }) } @@ -72,7 +75,12 @@ where where K: Hash + Eq + Clone + Send + Sync + 'static, { - RateLimiterRunner::start(self.quota, self.clock.clone(), self.flush_keys_interval) + RateLimiterRunner::start( + self.quota, + self.clock.clone(), + self.flush_keys_interval, + self.cpu_ns.clone(), + ) } pub fn emit_event_discarded(&self, key: String) { From 0b30d5f11707201394d0eb6c8779ff825c67b108 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Tue, 28 Apr 2026 18:23:43 +0200 Subject: [PATCH 17/19] docs(rfc): clarify upstream-component isolation in CPU metric scope Restructure the "Scope of the measurement" rationale bullet to make the upstream-isolation property explicit. Vector components only communicate via BufferReceiver / BufferSender channels (never via stream combinators chained across component boundaries), so polling a task transform's input dequeues items but never runs the upstream's code. Upstream CPU was charged to its own cpu_ns when it ran in its own task. Spell out what is and is not included in cpu_ns. Co-Authored-By: Claude Opus 4.7 (1M context) --- rfcs/2026-04-13-component-cpu-metric.md | 53 ++++++++++++++++++------- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/rfcs/2026-04-13-component-cpu-metric.md b/rfcs/2026-04-13-component-cpu-metric.md index c3bd3809745f7..097b109bcd269 100644 --- a/rfcs/2026-04-13-component-cpu-metric.md +++ b/rfcs/2026-04-13-component-cpu-metric.md @@ -108,20 +108,45 @@ This metric is always emitted for transforms; there is no configuration knob. thread it ran on. This isolates the timing concern from the transform body and keeps it robust if the body ever grows `.await` points (which task transforms have many of, by construction). -- **Scope of the measurement.** For sync transforms (inline and concurrent), - the bracket covers exactly the synchronous transform call. For task - transforms, the bracket covers the entire task body, which additionally - includes input-channel polls, the `Utilization`/`OutputUtilization` stream - wrappers' overhead, and the fanout-send loop. In practice the channel/fanout - overhead is small relative to the transform's own work, so the metric remains - a meaningful comparator across transform kinds. -- **Helper tasks are attributed too.** A few transforms spawn long-running - helper tokio tasks at construction time — `aws_ec2_metadata` (periodic IMDS - refresh) and `throttle` via `RateLimiterRunner` (periodic rate-limit-key - flush). The `cpu_ns` counter is plumbed through `TransformContext` so those - helpers can wrap their spawn with the same `.cpu_timed(counter)` and have - their CPU attributed to the same component, rather than being silently - excluded. +- **Scope of the measurement, and isolation from upstream components.** + Vector components communicate only through `BufferReceiver` / `BufferSender` + channels — never through stream combinators chained across component + boundaries. Each component runs in its own tokio task with its own poll + cycles. So when our task polls its input, it dequeues items from a shared + channel buffer; it does **not** run the upstream component's code. The + upstream produced those items earlier, in its own task, and its CPU was + already charged to its own `cpu_ns` counter at that point. This holds even + in the "channel is always full when we poll" case: those items were produced + by upstream CPU that was already counted upstream; we're just dequeueing them. + + The transform's `cpu_ns` therefore **includes**: + + - **Sync transforms** (inline and concurrent): exactly the synchronous + `transform_all` call. + - **Task transforms**: the entire task body — input-channel dequeue, + `Utilization` / `OutputUtilization` stream-wrapper overhead, the + user-defined transform's `poll_next`, the schema/latency `map`, and the + fanout-send loop. A single poll may churn through many items before + tokio's cooperative budget (default 128 ops) forces a `Pending`; all of + that is genuinely this task's work. + - **Helper tokio tasks** the transform spawns at construction time + (`aws_ec2_metadata`'s IMDS-refresh worker, `throttle`'s + `RateLimiterRunner` flush loop): the `cpu_ns` counter is plumbed through + `TransformContext` so those helpers wrap their spawn with the same + `.cpu_timed(counter)`. Their CPU is attributed back to this component + rather than silently excluded. + + And **does not** include: + + - The upstream component's transform/source CPU (stays on upstream's + counter, runs in upstream's task). + - Time our task was parked in `Pending` waiting for input (no polls happen, + so no CPU ticks). + - Other tokio tasks running on other worker threads. + + The channel-poll / fanout-send bookkeeping our wrapper does include is + small relative to the transform's own work, so the metric remains a + meaningful comparator across transform kinds. - **Low overhead.** Two `clock_gettime` calls per poll (~80ns total on Linux) is negligible relative to the work `transform_all` performs. - **No accumulation errors.** The counter stores `u64` nanoseconds; each From 443b5a43e02401dd657bcedf5343eb02039e7a89 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Tue, 28 Apr 2026 18:31:50 +0200 Subject: [PATCH 18/19] refactor(topology): pass TransformNode to build_task_transform MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Threading cpu_ns as a separate argument pushed build_task_transform above clippy's too_many_arguments threshold. Mirror build_sync_transform by taking the whole TransformNode and destructuring at the top. The later `let mut outputs = HashMap::new()` shadows the destructured Vec — fine since the Vec is only used earlier when building the schema_definition_map. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/topology/builder.rs | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index fbf9cb01d5eee..eccdcff67f0b3 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -733,15 +733,7 @@ impl<'a> Builder<'a> { // TODO: avoid the double boxing for function transforms here Transform::Function(t) => self.build_sync_transform(Box::new(t), node, input_rx), Transform::Synchronous(t) => self.build_sync_transform(t, node, input_rx), - Transform::Task(t) => self.build_task_transform( - t, - input_rx, - node.input_details.data_type(), - node.typetag, - &node.key, - &node.outputs, - node.cpu_ns.clone(), - ), + Transform::Task(t) => self.build_task_transform(t, node, input_rx), } } @@ -802,13 +794,19 @@ impl<'a> Builder<'a> { fn build_task_transform( &self, t: Box>, + node: TransformNode, input_rx: BufferReceiver, - input_type: DataType, - typetag: &str, - key: &ComponentKey, - outputs: &[TransformOutput], - cpu_ns: Counter, ) -> (Task, HashMap) { + let TransformNode { + key, + typetag, + input_details, + outputs, + cpu_ns, + .. + } = node; + let input_type = input_details.data_type(); + let (mut fanout, control) = Fanout::new(); let sender = self @@ -879,9 +877,9 @@ impl<'a> Builder<'a> { .boxed(); let mut outputs = HashMap::new(); - outputs.insert(OutputId::from(key), control); + outputs.insert(OutputId::from(&key), control); - let task = Task::new(key.clone(), typetag, transform); + let task = Task::new(key, typetag, transform); (task, outputs) } From 7a66b66dd7b11d6bcb1ea10a3dbde6f9f6670219 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Tue, 28 Apr 2026 18:49:25 +0200 Subject: [PATCH 19/19] fix formatting --- src/transforms/throttle/rate_limiter.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/transforms/throttle/rate_limiter.rs b/src/transforms/throttle/rate_limiter.rs index 76004f3c6fbb0..4b74e5b428e59 100644 --- a/src/transforms/throttle/rate_limiter.rs +++ b/src/transforms/throttle/rate_limiter.rs @@ -24,12 +24,7 @@ where K: Hash + Eq + Clone + Send + Sync + 'static, C: clock::Clock + Clone + Send + Sync + 'static, { - pub fn start( - quota: Quota, - clock: C, - flush_keys_interval: Duration, - cpu_ns: Counter, - ) -> Self { + pub fn start(quota: Quota, clock: C, flush_keys_interval: Duration, cpu_ns: Counter) -> Self { let rate_limiter = Arc::new(RateLimiter::dashmap_with_clock(quota, clock)); let rate_limiter_clone = Arc::clone(&rate_limiter);