diff --git a/Cargo.lock b/Cargo.lock index d013bc09432ec..bb18fb96c7a97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12435,6 +12435,7 @@ dependencies = [ "warp", "windows 0.58.0", "windows-service", + "windows-sys 0.52.0", "wiremock", "zstd 0.13.2", ] diff --git a/Cargo.toml b/Cargo.toml index 84d89c50096f9..bf9998c89697b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -396,6 +396,7 @@ itertools.workspace = true k8s-openapi = { version = "0.27.0", default-features = false, features = ["v1_31"], optional = true } kube = { version = "3.0.1", default-features = false, features = ["client", "openssl-tls", "runtime"], optional = true } listenfd = { version = "1.0.2", default-features = false, optional = true } +libc.workspace = true lru = { version = "0.16.3", default-features = false } maxminddb = { version = "0.27.0", default-features = false, optional = true, features = ["simdutf8"] } md-5 = { version = "0.10", default-features = false, optional = true } @@ -451,6 +452,7 @@ sysinfo = "0.37.2" byteorder = "1.5.0" [target.'cfg(windows)'.dependencies] +windows-sys = { version = "0.52", features = ["Win32_Foundation", "Win32_System_Threading"] } windows-service = "0.8.0" windows = { version = "0.58", features = ["Win32_System_EventLog", "Win32_Foundation", "Win32_System_Com", "Win32_Security", "Win32_Security_Authorization", "Win32_System_Threading", "Win32_Storage_FileSystem"], optional = true } quick-xml = { version = "0.31", default-features = false, features = ["serialize"], optional = true } diff --git a/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md b/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md new file mode 100644 index 0000000000000..bb56ff649e215 --- /dev/null +++ b/changelog.d/OPA-5012-add-per-component-cpu-metric.feature.md @@ -0,0 +1,5 @@ +Added a new counter metric `component_cpu_usage_ns_total` counting the CPU +time consumed by a transform in nanoseconds. Supported for sync, function, +and task transforms. + +authors: gwenaskell diff --git a/rfcs/2026-04-13-component-cpu-metric.md b/rfcs/2026-04-13-component-cpu-metric.md new file mode 100644 index 0000000000000..097b109bcd269 --- /dev/null +++ b/rfcs/2026-04-13-component-cpu-metric.md @@ -0,0 +1,247 @@ +# RFC 2026-04-13 - Per-component CPU time metric for transforms + +The current `utilization` gauge measures the fraction of wall-clock time a +component is not idle (i.e., not waiting on its input channel). Because sync +and function transforms can run concurrently across multiple tokio worker +threads, and because wall-clock "not idle" includes time the OS has preempted +the thread, this gauge does not accurately reflect how much CPU a component +actually consumes. This RFC proposes a new **counter** metric, +`component_cpu_usage_ns_total`, that tracks the cumulative CPU time consumed +by a component's transform work in nanoseconds, measured via OS thread-level +CPU clocks. + +## Context + +- The existing `utilization` metric is implemented in `src/utilization.rs`. +- Sync and function transforms are spawned in `src/topology/builder.rs` + via the `Runner` struct (`run_inline` and `run_concurrently` methods). +- Task transforms are built in `src/topology/builder.rs::build_task_transform` + and run as a single async task driving a stream-based pipeline. +- The `enable_concurrency` trait method controls whether a sync transform is + dispatched to parallel `tokio::spawn` tasks (up to + `TRANSFORM_CONCURRENCY_LIMIT`, which defaults to the number of worker + threads). + +## Cross cutting concerns + +- The `utilization` gauge remains as-is. This RFC adds a complementary metric; + it does not replace the existing one. +- Future work could extend this approach to sources and sinks. + +## Scope + +### In scope + +- A new `component_cpu_usage_ns_total` counter for **all transforms** — + sync and function transforms (both inline and concurrent execution paths) + and task transforms. +- Two implementation tiers: a wall-clock fallback that works everywhere, and a + precise thread-CPU-time implementation using OS APIs. +- Feasibility analysis of thread-level CPU time measurement. + +### Out of scope + +- Sources and sinks. +- Replacing or modifying the existing `utilization` gauge. + +## Pain + +1. **Utilization is misleading under concurrency.** In the concurrent + `run_concurrently` path, the utilization timer stays in "not waiting" state + from the moment events are received (`stop_wait` in `on_events_received`) + until a completed task's output is sent (`start_wait` in `send_outputs`). + The actual CPU work happens on separate `tokio::spawn`'d tasks that the + timer does not track. This means utilization measures **occupancy** (is + there at least one batch in flight?) rather than CPU consumption. + + Concrete example: a concurrent remap with 4 in-flight tasks each taking + 10ms, input arriving every 5ms. Input arrives frequently enough that + `stop_wait` fires before each spawn, keeping the timer in "not waiting" + almost continuously → utilization ≈ 100%. But actual CPU consumption is + 4 × 10ms / 20ms = 2 cores. The utilization gauge cannot distinguish + "2 cores" from "0.3 cores at 100% occupancy." + +2. **No way to detect CPU-bound transforms.** Operators tuning pipelines need to + know which transforms are CPU-bottlenecked. A `cpu_usage_ns_total` counter, + when divided by wall-clock time (in ns), directly gives CPU core utilization + and can exceed 1.0 when a transform genuinely uses multiple cores. + +## Proposal + +### User Experience + +A new counter metric is emitted for every transform: + +```prometheus +component_cpu_usage_ns_total{component_id="my_remap",component_kind="transform",component_type="remap"} 14207 +``` + +The value is cumulative CPU nanoseconds consumed by the component. Operators +use it to compute CPU core utilization: + +```promql +# Per-component CPU core usage (can exceed 1.0 with concurrency) +rate(component_cpu_usage_ns_total{component_id="my_remap"}[1m]) / 1e9 + +# Compare against utilization to separate CPU cost from pipeline pressure +rate(component_cpu_usage_ns_total{component_id="my_remap"}[1m]) / 1e9 + / + utilization{component_id="my_remap"} +``` + +This metric is always emitted for transforms; there is no configuration knob. + +## Rationale + +- **Direct CPU cost visibility.** Operators can identify which transforms are + CPU-bottlenecked vs. backpressure-limited, enabling informed tuning. +- **Composable with existing metrics.** `rate(component_cpu_usage_ns_total[1m]) / 1e9` + gives CPU cores used; dividing by `utilization` separates CPU from pipeline effects. +- **Measurement is hooked at the task's poll boundary.** For the concurrent + sync path and for task transforms, the spawned tokio task's future is wrapped + in an adapter that samples thread CPU time around every call to + `Future::poll`. Tokio's cooperative scheduler guarantees that within a single + poll the task cannot be moved to another worker thread and no other task can + run on the current thread, so each `(before_poll, after_poll)` pair is a + clean per-thread measurement. Multiple polls (across `Pending` returns and + wake-ups) accumulate correctly, with each poll independently sampling the + thread it ran on. This isolates the timing concern from the transform body + and keeps it robust if the body ever grows `.await` points (which task + transforms have many of, by construction). +- **Scope of the measurement, and isolation from upstream components.** + Vector components communicate only through `BufferReceiver` / `BufferSender` + channels — never through stream combinators chained across component + boundaries. Each component runs in its own tokio task with its own poll + cycles. So when our task polls its input, it dequeues items from a shared + channel buffer; it does **not** run the upstream component's code. The + upstream produced those items earlier, in its own task, and its CPU was + already charged to its own `cpu_ns` counter at that point. This holds even + in the "channel is always full when we poll" case: those items were produced + by upstream CPU that was already counted upstream; we're just dequeueing them. + + The transform's `cpu_ns` therefore **includes**: + + - **Sync transforms** (inline and concurrent): exactly the synchronous + `transform_all` call. + - **Task transforms**: the entire task body — input-channel dequeue, + `Utilization` / `OutputUtilization` stream-wrapper overhead, the + user-defined transform's `poll_next`, the schema/latency `map`, and the + fanout-send loop. A single poll may churn through many items before + tokio's cooperative budget (default 128 ops) forces a `Pending`; all of + that is genuinely this task's work. + - **Helper tokio tasks** the transform spawns at construction time + (`aws_ec2_metadata`'s IMDS-refresh worker, `throttle`'s + `RateLimiterRunner` flush loop): the `cpu_ns` counter is plumbed through + `TransformContext` so those helpers wrap their spawn with the same + `.cpu_timed(counter)`. Their CPU is attributed back to this component + rather than silently excluded. + + And **does not** include: + + - The upstream component's transform/source CPU (stays on upstream's + counter, runs in upstream's task). + - Time our task was parked in `Pending` waiting for input (no polls happen, + so no CPU ticks). + - Other tokio tasks running on other worker threads. + + The channel-poll / fanout-send bookkeeping our wrapper does include is + small relative to the transform's own work, so the metric remains a + meaningful comparator across transform kinds. +- **Low overhead.** Two `clock_gettime` calls per poll (~80ns total on Linux) + is negligible relative to the work `transform_all` performs. +- **No accumulation errors.** The counter stores `u64` nanoseconds; each + increment is exact integer arithmetic. The single `u64 → f64` cast at scrape + time has bounded, non-accumulated error. + +## Drawbacks + +- **Platform-specific code.** The precise implementation uses `cfg`-gated FFI + for Linux, macOS, and Windows. Other platforms fall back to wall-clock time, + giving three maintained code paths plus one fallback. + +## Alternatives + +### Extend the existing utilization gauge + +Add a CPU-time-based "utilization v2" that replaces the current gauge. + +**Rejected because:** The current utilization metric serves a different purpose +(pipeline flow analysis: is this component starved or saturated?). CPU time is a +complementary signal, not a replacement. Conflating them would lose information. + +### Per-event latency histogram + +Emit a histogram of per-event processing time instead of a cumulative counter. + +**Rejected because:** Histograms are expensive at high throughput (Vector +processes millions of events/sec). A counter that increments once per batch is +far cheaper. Per-event latency can be derived from the counter and +`events_sent_total` if needed (`cpu_ns / events = avg cpu ns per event`). + +### `getrusage(RUSAGE_THREAD)` instead of `clock_gettime` + +On Linux, `getrusage(RUSAGE_THREAD)` also provides per-thread CPU time (as +`ru_utime` + `ru_stime`). + +**Not preferred because:** `clock_gettime(CLOCK_THREAD_CPUTIME_ID)` has +nanosecond precision vs. microsecond for `getrusage`. Both are vDSO-accelerated +on modern kernels. The higher precision is worth the identical cost. + +## Outstanding Questions + +1. **User/system split:** Should we report user and system CPU time separately + (as `mode="user"` / `mode="system"` tags) like `host_cpu_seconds_total` + does? The Linux API supports this. It adds cardinality but helps distinguish + transforms that trigger syscalls (e.g., enrichment table lookups) from pure + computation. + +## Plan Of Attack + +- Add `src/cpu_time.rs` module exposing: + - A `ThreadTime` snapshot with platform-specific implementations behind + `#[cfg]` gates (Linux/macOS `CLOCK_THREAD_CPUTIME_ID`, Windows + `GetThreadTimes`, wall-clock fallback elsewhere). Include unit tests that + verify the returned duration is non-negative and monotone. + - A `CpuTimedFuture` adapter that wraps a future and, on every + `Future::poll`, samples `ThreadTime` before and after the inner poll and + increments a `metrics::Counter` by the delta. A `CpuTimedExt` extension + trait exposes it as a chained `.cpu_timed(counter)` call, mirroring + `tracing::Instrument::in_current_span`. +- Add a `cpu_ns: Counter` field to `TransformContext`, defaulting to + `Counter::noop()`. In `build_transforms`, register the counter inside the + transform `error_span!` so it is tagged with `component_id`, + `component_kind`, and `component_type`, then store it on the context. This + is the single resolved handle every transform path consumes — sync, task, + and any helper tokio tasks — so labels and recorder lookup are paid once, + not on every poll. Also propagate the same handle through `TransformNode` + for the topology builder's own use. +- For `run_inline`, bracket the synchronous `transform_all` call directly with + `ThreadTime::now()` / `elapsed()`. The transform task itself owns this code + and there is no `.await` between the brackets, so inline measurement is the + simplest correct option. +- For `run_concurrently`, wrap the spawned per-batch future via + `.cpu_timed(cpu_ns.clone())` rather than measuring inline. This hooks the + measurement onto the task's `Future::poll` boundary and makes the pattern + uniform for any future async work added inside the spawned body. +- For `build_task_transform`, take the counter from `TransformNode` (which + receives it from the context) and wrap the outer task future with + `.cpu_timed(counter)` before `.boxed()`. CPU time accumulates across every + poll of the task, naturally excluding time the task is parked in `Pending`. +- For transforms that spawn helper tokio tasks at construction time + (`aws_ec2_metadata`, `throttle`'s `RateLimiterRunner`), read + `context.cpu_ns.clone()` in `build()` and `.cpu_timed(...)` the spawned + helper future. For `RateLimiterRunner::start`, plumb the counter through as + a parameter so it stays the throttle config's responsibility to provide it. +- Add integration test: run a CPU-intensive remap transform, verify + `component_cpu_usage_ns_total` is within 10% of expected CPU time. +- Add documentation for the new metric in the generated component docs. +- Add changelog fragment. + +## Future Improvements + +- Extend to **sources and sinks** where the component owns a synchronous + processing step (e.g., codec encoding in sinks). +- Expose a derived **`cpu_utilization` gauge** (CPU seconds / wall seconds) + computed by the `UtilizationEmitter` for operators who prefer a ready-to-use + ratio. +- Add `mode="user"` / `mode="system"` tag split for deeper CPU profiling. diff --git a/src/config/transform.rs b/src/config/transform.rs index 7f5d009030921..586a4923ac91e 100644 --- a/src/config/transform.rs +++ b/src/config/transform.rs @@ -6,6 +6,7 @@ use std::{ use async_trait::async_trait; use dyn_clone::DynClone; +use metrics::Counter; use serde::Serialize; use vector_lib::{ config::{GlobalOptions, Input, LogNamespace, TransformOutput}, @@ -141,6 +142,12 @@ pub struct TransformContext { /// Extra context data provided by the running app and shared across all components. This can be /// used to pass shared settings or other data from outside the components. pub extra_context: ExtraContext, + + /// Counter handle for `component_cpu_usage_ns_total`, pre-tagged with this transform's + /// component identity. Transforms that spawn helper tokio tasks at construction time + /// (e.g. `aws_ec2_metadata`, `throttle`) clone this and `.cpu_timed(...)` those tasks so + /// their CPU is attributed to the component alongside the main transform task. + pub cpu_ns: Counter, } impl Default for TransformContext { @@ -154,6 +161,7 @@ impl Default for TransformContext { merged_schema_definition: schema::Definition::any(), schema: SchemaOptions::default(), extra_context: Default::default(), + cpu_ns: Counter::noop(), } } } diff --git a/src/cpu_time.rs b/src/cpu_time.rs new file mode 100644 index 0000000000000..24068be8553b4 --- /dev/null +++ b/src/cpu_time.rs @@ -0,0 +1,251 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use metrics::Counter; +use pin_project::pin_project; + +/// An opaque snapshot of thread CPU time. +/// +/// On Linux and macOS this uses `CLOCK_THREAD_CPUTIME_ID`, which measures +/// only the time the calling thread was actually scheduled on a CPU (true CPU +/// time, excluding preemption and context switches to other threads/processes). +/// +/// On Windows this uses `GetThreadTimes`, which provides the same guarantee +/// with 100ns granularity. +/// +/// On other platforms this falls back to wall-clock time via +/// [`std::time::Instant`]. +/// +/// # Usage +/// +/// Call [`ThreadTime::now`] immediately before the work to measure, then call +/// [`ThreadTime::elapsed`] immediately after: +/// +/// ```ignore +/// let t0 = ThreadTime::now(); +/// do_work(); +/// let cpu_time = t0.elapsed(); +/// ``` +/// +/// # Correctness for sync transforms +/// +/// This measurement is accurate for [`crate::transforms::SyncTransform`] +/// because `transform_all` is synchronous and non-yielding: between the two +/// measurement points the worker thread runs only transform code, with no +/// `.await` points that could interleave other tokio tasks. +pub(crate) struct ThreadTime(Inner); + +impl ThreadTime { + /// Captures the current thread CPU time. + #[inline] + pub(crate) fn now() -> Self { + ThreadTime(Inner::now()) + } + + /// Returns the CPU time elapsed since this snapshot was taken. + #[inline] + pub(crate) fn elapsed(&self) -> Duration { + self.0.elapsed() + } +} + +// ── Linux / macOS: CLOCK_THREAD_CPUTIME_ID ──────────────────────────────── + +#[cfg(any(target_os = "linux", target_os = "macos"))] +struct Inner(Duration); + +#[cfg(any(target_os = "linux", target_os = "macos"))] +impl Inner { + fn now() -> Self { + let mut ts = libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }; + // SAFETY: + // - `ts` is a valid, zero-initialised `timespec` on the stack. + // - `CLOCK_THREAD_CPUTIME_ID` is a valid clock ID on Linux ≥ 2.6 and + // macOS ≥ 10.12 (both are baseline requirements for Vector). + // - The return value is intentionally ignored: the only failure modes + // are an invalid clock ID (not the case here) or an invalid pointer + // (not the case here), neither of which can occur. + unsafe { + libc::clock_gettime(libc::CLOCK_THREAD_CPUTIME_ID, &mut ts); + } + Inner(Duration::new(ts.tv_sec as u64, ts.tv_nsec as u32)) + } + + #[inline] + fn elapsed(&self) -> Duration { + Self::now().0.saturating_sub(self.0) + } +} + +// ── Windows: GetThreadTimes ─────────────────────────────────────────────── + +#[cfg(target_os = "windows")] +struct Inner(Duration); + +#[cfg(target_os = "windows")] +impl Inner { + fn now() -> Self { + use windows_sys::Win32::Foundation::FILETIME; + use windows_sys::Win32::System::Threading::{GetCurrentThread, GetThreadTimes}; + + let mut creation = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + let mut exit = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + let mut kernel = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + let mut user = FILETIME { + dwLowDateTime: 0, + dwHighDateTime: 0, + }; + + // SAFETY: + // - `GetCurrentThread()` returns a pseudo-handle that is always valid + // and does not need to be closed. + // - All four `FILETIME` pointers are valid, properly aligned, and + // stack-allocated. + // - The return value is intentionally ignored: failure is only possible + // with an invalid handle, which cannot occur with `GetCurrentThread()`. + unsafe { + GetThreadTimes( + GetCurrentThread(), + &mut creation, + &mut exit, + &mut kernel, + &mut user, + ); + } + + // Combine the low/high halves of each FILETIME into a u64, then sum + // kernel + user. FILETIME units are 100-nanosecond intervals. + let kernel_ns = filetime_to_nanos(kernel); + let user_ns = filetime_to_nanos(user); + Inner(Duration::from_nanos(kernel_ns + user_ns)) + } + + #[inline] + fn elapsed(&self) -> Duration { + Self::now().0.saturating_sub(self.0) + } +} + +#[cfg(target_os = "windows")] +#[inline] +fn filetime_to_nanos(ft: windows_sys::Win32::Foundation::FILETIME) -> u64 { + let ticks = ((ft.dwHighDateTime as u64) << 32) | (ft.dwLowDateTime as u64); + ticks * 100 // convert 100ns intervals to nanoseconds +} + +// ── Other platforms: wall-clock fallback ────────────────────────────────── + +#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] +struct Inner(std::time::Instant); + +#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] +impl Inner { + fn now() -> Self { + Inner(std::time::Instant::now()) + } + + #[inline] + fn elapsed(&self) -> Duration { + self.0.elapsed() + } +} + +// ── CpuTimedFuture: per-poll CPU time accumulator ───────────────────────── + +/// A [`Future`] adapter that accumulates thread CPU time across every `poll`. +/// +/// Each call to [`Future::poll`] is bracketed by a [`ThreadTime`] sample; +/// the delta is added to `counter`. Tokio's executor cannot migrate a task +/// to another worker thread or run another task on the current thread between +/// the entry and exit of a single `poll`, so each delta is a clean per-thread +/// CPU-time measurement of the wrapped future's work for that poll. Multiple +/// polls (across `Pending` returns and wake-ups) accumulate into the same +/// counter, with each poll independently sampling the thread it ran on. +/// +/// This is the per-task analogue of tokio's unstable +/// `on_before_task_poll` / `on_after_task_poll` runtime hooks: it hooks the +/// same boundary, but on a single future rather than the whole runtime, and +/// it works on stable Rust without `--cfg tokio_unstable`. +/// +/// Construct it via [`CpuTimedExt::cpu_timed`]. +#[pin_project] +pub(crate) struct CpuTimedFuture { + #[pin] + inner: F, + counter: Counter, +} + +impl Future for CpuTimedFuture { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let t0 = ThreadTime::now(); + let result = this.inner.poll(cx); + this.counter.increment(t0.elapsed().as_nanos() as u64); + result + } +} + +/// Extension trait that wraps a future in [`CpuTimedFuture`] via a chained +/// call: +/// +/// ```ignore +/// async move { /* work */ }.cpu_timed(counter) +/// ``` +/// +/// Mirrors the style of [`tracing::Instrument::in_current_span`]. +pub(crate) trait CpuTimedExt: Future + Sized { + fn cpu_timed(self, counter: Counter) -> CpuTimedFuture { + CpuTimedFuture { + inner: self, + counter, + } + } +} + +impl CpuTimedExt for F {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn elapsed_is_non_negative() { + let t0 = ThreadTime::now(); + // Burn a small amount of CPU to ensure the clock advances. + let _: u64 = (0u64..10_000).sum(); + assert!(t0.elapsed() >= Duration::ZERO); + } + + #[test] + fn elapsed_is_monotone() { + // Two consecutive elapsed() calls on the same snapshot must be + // non-decreasing (the clock never goes backwards). + let t0 = ThreadTime::now(); + let _: u64 = (0u64..10_000).sum(); + let first = t0.elapsed(); + let _: u64 = (0u64..10_000).sum(); + let second = t0.elapsed(); + assert!( + second >= first, + "clock went backwards: {second:?} < {first:?}" + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index 8d26a3b080ecf..2a876e05ea984 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,6 +81,7 @@ pub mod aws; pub mod common; pub mod completion; mod convert_config; +pub(crate) mod cpu_time; pub mod encoding_transcode; pub mod enrichment_tables; pub mod extra_context; diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 8048b7a6073ed..eccdcff67f0b3 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -8,7 +8,7 @@ use std::{ use futures::{FutureExt, StreamExt, TryStreamExt, stream::FuturesOrdered}; use futures_util::stream::FuturesUnordered; -use metrics::gauge; +use metrics::{Counter, counter, gauge}; use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire}; use tokio::{ select, @@ -45,6 +45,7 @@ use crate::{ ComponentKey, Config, DataType, EnrichmentTableConfig, Input, Inputs, OutputId, ProxyConfig, SinkContext, SourceContext, TransformContext, TransformOuter, TransformOutput, }, + cpu_time::{CpuTimedExt, ThreadTime}, event::{EventArray, EventContainer}, extra_context::ExtraContext, internal_events::EventsReceived, @@ -481,6 +482,11 @@ impl<'a> Builder<'a> { }) .collect::>(); + // Resolve the per-component CPU counter inside the transform span so it + // picks up component_id/component_kind/component_type tags. The same + // handle is shared between the main transform task and any helper + // tokio tasks the transform spawns at construction time. + let cpu_ns = counter!("component_cpu_usage_ns_total"); let context = TransformContext { key: Some(key.clone()), globals: self.config.global.clone(), @@ -490,6 +496,7 @@ impl<'a> Builder<'a> { merged_schema_definition: merged_definition.clone(), schema: self.config.schema, extra_context: self.extra_context.clone(), + cpu_ns, }; let node = @@ -726,14 +733,7 @@ impl<'a> Builder<'a> { // TODO: avoid the double boxing for function transforms here Transform::Function(t) => self.build_sync_transform(Box::new(t), node, input_rx), Transform::Synchronous(t) => self.build_sync_transform(t, node, input_rx), - Transform::Task(t) => self.build_task_transform( - t, - input_rx, - node.input_details.data_type(), - node.typetag, - &node.key, - &node.outputs, - ), + Transform::Task(t) => self.build_task_transform(t, node, input_rx), } } @@ -755,6 +755,7 @@ impl<'a> Builder<'a> { node.input_details.data_type(), outputs, LatencyRecorder::new(self.config.global.latency_ewma_alpha), + node.cpu_ns.clone(), ); let transform = if node.enable_concurrency { runner.run_concurrently().boxed() @@ -793,12 +794,19 @@ impl<'a> Builder<'a> { fn build_task_transform( &self, t: Box>, + node: TransformNode, input_rx: BufferReceiver, - input_type: DataType, - typetag: &str, - key: &ComponentKey, - outputs: &[TransformOutput], ) -> (Task, HashMap) { + let TransformNode { + key, + typetag, + input_details, + outputs, + cpu_ns, + .. + } = node; + let input_type = input_details.data_type(); + let (mut fanout, control) = Fanout::new(); let sender = self @@ -865,12 +873,13 @@ impl<'a> Builder<'a> { } } } + .cpu_timed(cpu_ns) .boxed(); let mut outputs = HashMap::new(); - outputs.insert(OutputId::from(key), control); + outputs.insert(OutputId::from(&key), control); - let task = Task::new(key.clone(), typetag, transform); + let task = Task::new(key, typetag, transform); (task, outputs) } @@ -1102,6 +1111,7 @@ struct TransformNode { input_details: Input, outputs: Vec, enable_concurrency: bool, + cpu_ns: Counter, } impl TransformNode { @@ -1118,6 +1128,7 @@ impl TransformNode { input_details: transform.inner.input(), outputs: transform.inner.outputs(context, schema_definition), enable_concurrency: transform.inner.enable_concurrency(), + cpu_ns: context.cpu_ns.clone(), } } } @@ -1130,6 +1141,7 @@ struct Runner { timer_tx: UtilizationComponentSender, latency_recorder: LatencyRecorder, events_received: Registered, + cpu_ns: Counter, } impl Runner { @@ -1140,6 +1152,7 @@ impl Runner { input_type: DataType, outputs: TransformOutputs, latency_recorder: LatencyRecorder, + cpu_ns: Counter, ) -> Self { Self { transform, @@ -1149,6 +1162,7 @@ impl Runner { timer_tx, latency_recorder, events_received: register!(EventsReceived), + cpu_ns, } } @@ -1184,7 +1198,9 @@ impl Runner { self.timer_tx.try_send_start_wait(); while let Some(events) = input_rx.next().await { self.on_events_received(&events); + let t0 = ThreadTime::now(); self.transform.transform_all(events, &mut outputs_buf); + self.cpu_ns.increment(t0.elapsed().as_nanos() as u64); self.send_outputs(&mut outputs_buf) .await .map_err(TaskError::wrapped)?; @@ -1233,12 +1249,22 @@ impl Runner { let mut t = self.transform.clone(); let mut outputs_buf = self.outputs.new_buf_with_capacity(len); - let task = tokio::spawn(async move { - for events in input_arrays { - t.transform_all(events, &mut outputs_buf); + // Hook CPU-time accounting onto the spawned task at + // the `Future::poll` boundary. The transform body + // contains no `.await`, so a single poll runs to + // completion on one worker thread; if a future + // refactor adds awaits, accumulation across polls + // remains correct. + let task = tokio::spawn( + async move { + for events in input_arrays { + t.transform_all(events, &mut outputs_buf); + } + outputs_buf } - outputs_buf - }.in_current_span()); + .cpu_timed(self.cpu_ns.clone()) + .in_current_span(), + ); in_flight.push_back(task); } None => { diff --git a/src/transforms/aws_ec2_metadata.rs b/src/transforms/aws_ec2_metadata.rs index ed55d5aa2dfe8..e358d0cc5157a 100644 --- a/src/transforms/aws_ec2_metadata.rs +++ b/src/transforms/aws_ec2_metadata.rs @@ -30,6 +30,7 @@ use crate::{ config::{ DataType, Input, OutputId, ProxyConfig, TransformConfig, TransformContext, TransformOutput, }, + cpu_time::CpuTimedExt, event::Event, http::HttpClient, internal_events::{AwsEc2MetadataRefreshError, AwsEc2MetadataRefreshSuccessful}, @@ -237,10 +238,15 @@ impl TransformConfig for Ec2Metadata { } } + // The metadata-refresh loop runs as its own tokio task, so the main + // transform task's `.cpu_timed` wrapper does not see it. Wrap the + // background task with the same component-tagged counter so its CPU + // is attributed to this transform. tokio::spawn( async move { client.run().await; } + .cpu_timed(context.cpu_ns.clone()) // TODO: Once #1338 is done we can fetch the current span .instrument(info_span!("aws_ec2_metadata: worker").or_current()), ); diff --git a/src/transforms/throttle/rate_limiter.rs b/src/transforms/throttle/rate_limiter.rs index 83f1b8de5384b..4b74e5b428e59 100644 --- a/src/transforms/throttle/rate_limiter.rs +++ b/src/transforms/throttle/rate_limiter.rs @@ -3,8 +3,11 @@ use std::{hash::Hash, sync::Arc, time::Duration}; use governor::{ Quota, RateLimiter, clock, middleware::NoOpMiddleware, state::keyed::DashMapStateStore, }; +use metrics::Counter; use tokio; +use crate::cpu_time::CpuTimedExt; + /// Re-usable wrapper around the structs/type from the governor crate. /// Spawns a background task that periodically flushes keys that haven't been accessed recently. pub struct RateLimiterRunner @@ -21,17 +24,22 @@ where K: Hash + Eq + Clone + Send + Sync + 'static, C: clock::Clock + Clone + Send + Sync + 'static, { - pub fn start(quota: Quota, clock: C, flush_keys_interval: Duration) -> Self { + pub fn start(quota: Quota, clock: C, flush_keys_interval: Duration, cpu_ns: Counter) -> Self { let rate_limiter = Arc::new(RateLimiter::dashmap_with_clock(quota, clock)); let rate_limiter_clone = Arc::clone(&rate_limiter); - let flush_handle = tokio::spawn(async move { - let mut interval = tokio::time::interval(flush_keys_interval); - loop { - interval.tick().await; - rate_limiter_clone.retain_recent(); + // Hook the periodic key-flush task onto the component's CPU counter so + // its housekeeping work is attributed to this throttle transform. + let flush_handle = tokio::spawn( + async move { + let mut interval = tokio::time::interval(flush_keys_interval); + loop { + interval.tick().await; + rate_limiter_clone.retain_recent(); + } } - }); + .cpu_timed(cpu_ns), + ); Self { rate_limiter, diff --git a/src/transforms/throttle/transform.rs b/src/transforms/throttle/transform.rs index 11647c65b2f4c..471582b93e6bb 100644 --- a/src/transforms/throttle/transform.rs +++ b/src/transforms/throttle/transform.rs @@ -3,6 +3,7 @@ use std::{hash::Hash, num::NonZeroU32, pin::Pin, time::Duration}; use async_stream::stream; use futures::{Stream, StreamExt}; use governor::{Quota, clock}; +use metrics::Counter; use snafu::Snafu; use super::{ @@ -26,6 +27,7 @@ pub struct Throttle, I: clock::Reference> { exclude: Option, pub clock: C, internal_metrics: ThrottleInternalMetricsConfig, + cpu_ns: Counter, } impl Throttle @@ -64,6 +66,7 @@ where key_field: config.key_field.clone(), exclude, internal_metrics: config.internal_metrics.clone(), + cpu_ns: context.cpu_ns.clone(), }) } @@ -72,7 +75,12 @@ where where K: Hash + Eq + Clone + Send + Sync + 'static, { - RateLimiterRunner::start(self.quota, self.clock.clone(), self.flush_keys_interval) + RateLimiterRunner::start( + self.quota, + self.clock.clone(), + self.flush_keys_interval, + self.cpu_ns.clone(), + ) } pub fn emit_event_discarded(&self, key: String) { diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index 98ed1ffe025d7..c06e3089ee264 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -359,6 +359,12 @@ components: sources: internal_metrics: { default_namespace: "vector" tags: _component_tags } + component_cpu_usage_ns_total: { + description: "The CPU time consumed by a component in nanoseconds. Available for sync and function transforms." + type: "counter" + default_namespace: "vector" + tags: _component_tags + } component_discarded_events_total: { description: "The number of events dropped by this component." type: "counter"