diff --git a/perf-changelog.yaml b/perf-changelog.yaml index def63fd87..69d5c7d8a 100644 --- a/perf-changelog.yaml +++ b/perf-changelog.yaml @@ -3171,3 +3171,9 @@ description: - "Validates measured-power aggregation pipeline (PR #1558) on both NVIDIA (H200) and AMD (MI355X) hardware — different SMI tools (nvidia-smi vs amd-smi), different CSV schemas (power.draw [W] vs socket_power), same aggregator. No config change. Entry intentionally kept past merge so run-sweep produces canonical agg JSONs with avg_power_w + joules_per_output_token on main for both vendors, seeding the dashboard's day-zero data." pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1558 + +- config-keys: + - dsv4-fp4-gb300-dynamo-sglang + description: + - "Smoke run validating multinode measured-power aggregation (PR #1574). No config change; entry exists to trigger a sweep that produces the first multinode agg JSON with avg_power_w + joules_per_*_token populated from per-node srt-slurm perfmon CSVs. Validates per-source GPU-id namespacing in aggregate_power.py (without it, 14 nodes × 4 GPUs would report num_gpus=4 instead of 56) and the GPU_METRICS_CSV_GLOB env var bridge in process_result.py. Only the gb300-cw runner has the perfmon launcher changes; any gb300-nv runs in the sweep will succeed normally without power fields, which the dashboard handles gracefully (chart gates on field presence)." + pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1574 diff --git a/runners/launch_gb300-cw.sh b/runners/launch_gb300-cw.sh index 25e7f4db5..951c350de 100644 --- a/runners/launch_gb300-cw.sh +++ b/runners/launch_gb300-cw.sh @@ -12,8 +12,13 @@ if [[ $MODEL_PREFIX == "dsv4" && $PRECISION == "fp4" ]]; then export MODEL_PATH="/mnt/vast/models/dsv4" if [[ $FRAMEWORK == "dynamo-sglang" ]]; then - SRT_SLURM_RECIPES_REPO="https://github.com/NVIDIA/srt-slurm.git" - SRT_SLURM_RECIPES_REF="main" + # Pinned to our SemiAnalysisAI fork of NVIDIA/srt-slurm to pick up + # PR #35 (per-node nvidia-smi monitoring during the benchmark sweep) + # ahead of its upstream merge. The branch tracks PR #35's head SHA: + # to bump, re-fetch refs/pull/35/head from NVIDIA/srt-slurm and force- + # push to SemiAnalysisAI/srt-slurm:feat/inferencex-perfmon. + SRT_SLURM_RECIPES_REPO="https://github.com/SemiAnalysisAI/srt-slurm.git" + SRT_SLURM_RECIPES_REF="feat/inferencex-perfmon" SRT_RECIPE_SRC="$GITHUB_WORKSPACE/benchmarks/multi_node/srt-slurm-recipes/sglang/deepseek-v4" SRT_RECIPE_DST="recipes/sglang/deepseek-v4" elif [[ $FRAMEWORK == "dynamo-vllm" ]]; then @@ -106,6 +111,30 @@ git checkout "$SRT_SLURM_RECIPES_REF" mkdir -p "$SRT_RECIPE_DST" cp -rT "$SRT_RECIPE_SRC" "$SRT_RECIPE_DST" +# Enable per-node GPU perfmon (PR #35) on every overlaid recipe. `monitoring` +# is a top-level SrtConfig field and defaults to None, so without this the +# orchestrator's _start_perf_monitor short-circuits and no perf_samples_*.csv +# are ever written — multinode measured-power aggregation would silently +# skip. Idempotent: skips recipes that already declare `monitoring:`. +# +# CRITICAL: use `find` recursively, not a flat `*.yaml` glob. Recipes live +# in $SRT_RECIPE_DST//*.yaml (e.g. .../8k1k/*.yaml) — a flat glob +# matches zero files, the loop runs zero times, no recipe gets monitoring, +# and perfmon never spawns. PR #1574's first real sweep (#26548110246) hit +# exactly this: completed "success" with no power data because the glob +# matched nothing and the failure was silent end-to-end. +INJECTED_COUNT=0 +while IFS= read -r recipe; do + if ! grep -q '^monitoring:' "$recipe"; then + printf '\nmonitoring:\n enabled: true\n sample_interval: 1.0\n' >> "$recipe" + echo "[perfmon] enabled monitoring in recipe: $recipe" + INJECTED_COUNT=$((INJECTED_COUNT + 1)) + fi +done < <(find "$SRT_RECIPE_DST" -type f -name '*.yaml') +if [ "$INJECTED_COUNT" -eq 0 ]; then + echo "[perfmon] WARNING: zero recipes received monitoring injection under $SRT_RECIPE_DST. Either every recipe already had it, or the directory layout changed — power data will be MISSING from this run." >&2 +fi + echo "Installing srtctl..." # CRITICAL — uv install location. # Runner pod is x86 but compute nodes are aarch64, and /mnt/home is @@ -279,6 +308,25 @@ else echo "Warning: Logs directory not found at $LOGS_DIR" fi +# Hand the per-node perfmon CSVs off to the downstream "Process result" step +# in benchmark-multinode-tmpl.yml. srt-slurm's perfmon (PR #35) writes +# perf_samples_{node}.csv straight into $LOGS_DIR on the host. process_result.py +# already invokes aggregate_power.run() inline; teaching it to read +# GPU_METRICS_CSV_GLOB lets utils/aggregate_power.py do the multi-CSV +# aggregation (each agg JSON gets avg_power_w / joules_per_*_token patched in +# place). Use an absolute glob because process_result.py runs from +# $GITHUB_WORKSPACE, not from this srt-slurm checkout. +if [ -d "$LOGS_DIR" ]; then + perf_glob_dir="$(pwd)/$LOGS_DIR" + perf_csv_count=$(ls "$perf_glob_dir"/perf_samples_*.csv 2>/dev/null | wc -l | tr -d ' ') + if [ "$perf_csv_count" -gt 0 ]; then + echo "[perfmon] Found $perf_csv_count per-node perf_samples_*.csv under $perf_glob_dir/" + echo "GPU_METRICS_CSV_GLOB=$perf_glob_dir/perf_samples_*.csv" >> "$GITHUB_ENV" + else + echo "[perfmon] WARNING: monitoring enabled but no perf_samples_*.csv found in $perf_glob_dir — measured power aggregation will be skipped" + fi +fi + if [[ "${EVAL_ONLY:-false}" != "true" ]]; then if [ ! -d "$LOGS_DIR" ]; then exit 1 diff --git a/utils/aggregate_power.py b/utils/aggregate_power.py index 3c204085a..3aa363c74 100644 --- a/utils/aggregate_power.py +++ b/utils/aggregate_power.py @@ -1,12 +1,19 @@ """Aggregate measured GPU power from a vendor SMI CSV into the agg result JSON. -Reads a GPU-metrics CSV produced by `start_gpu_monitor` (nvidia-smi or amd-smi), -filters samples to the benchmark load window using start/end Unix timestamps -written by benchmark_serving.py, and patches two keys into the aggregated -result JSON consumed by InferenceX-app's ETL: +Reads a GPU-metrics CSV produced by `start_gpu_monitor` (nvidia-smi or amd-smi) +or by srt-slurm's per-node perfmon (multinode), filters samples to the benchmark +load window using start/end Unix timestamps written by benchmark_serving.py, and +patches three keys into the aggregated result JSON consumed by InferenceX-app's +ETL: - avg_power_w: mean per-GPU power draw (W) during the load window - joules_per_output_token: (avg_power_w * num_gpus * duration_s) / total_output_tokens + - joules_per_total_token: same, divided by (input + output) tokens + +Multinode: accepts multiple CSV paths (one per worker node). GPU indices are +namespaced by source CSV stem to avoid the same-index collision across nodes — +e.g. 8 nodes each reporting indices 0..3 would otherwise be miscounted as 4 +total GPUs instead of 32. The ETL (`packages/db/src/etl/benchmark-mapper.ts`) auto-captures any numeric field in the agg JSON into the `metrics` JSONB column, so no schema migration @@ -14,8 +21,8 @@ Vendor schema detection is regex-based: any timestamp-like column + any column whose name contains "power" (excluding "limit"/"cap"/"max") is picked up. -NVIDIA emits "power.draw [W]"; AMD's amd-smi varies by version. Both are -handled. +NVIDIA emits "power.draw [W]"; AMD's amd-smi varies by version; srt-slurm's +perfmon emits "power_w". All are handled. This script is best-effort. Missing or malformed CSV exits 0 without patching so a monitoring hiccup never breaks the benchmark upload. @@ -25,9 +32,11 @@ import argparse import csv +import glob as glob_module import json import re import sys +from collections.abc import Iterable from datetime import datetime, timezone from pathlib import Path from statistics import mean @@ -39,6 +48,27 @@ _GPU_INDEX_COL_RE = re.compile(r"^(index|gpu|gpu_id|gpu_index|card|device)$", re.IGNORECASE) _NUMBER_RE = re.compile(r"-?\d+(?:\.\d+)?") +# Matches perf_samples__w_.csv as written by srt-slurm's +# perfmon (SemiAnalysisAI/srt-slurm:feat/inferencex-perfmon). Hostnames can +# contain underscores and digits, so the role and idx are anchored before the +# host portion. Old-format filenames (perf_samples_.csv) don't match +# and fall through to the unlabeled cluster-wide path. +_FILENAME_ROLE_RE = re.compile(r"^perf_samples_(?P[a-z]+)_w(?P\d+)_(?P.+)$") + + +def _parse_role_from_filename(path: Path) -> tuple[str | None, int | None]: + """Return (role, worker_idx) parsed from the CSV stem, or (None, None). + + Role is one of "prefill", "decode", "agg", "frontend" depending on what + srt-slurm's _start_perf_monitor labels the node with. Unlabeled filenames + (old format) return (None, None) so callers can treat them as cluster-wide + contributions without per-worker attribution. + """ + m = _FILENAME_ROLE_RE.match(path.stem) + if not m: + return None, None + return m.group("role"), int(m.group("idx")) + def _parse_timestamp(value: str) -> float | None: """Best-effort timestamp parse to Unix epoch seconds (local wall clock). @@ -109,74 +139,84 @@ def _detect_columns(header: list[str]) -> tuple[str | None, str | None, str | No def aggregate_power( - csv_path: Path, + csv_path: Path | Iterable[Path], start_unix: float, end_unix: float, ) -> tuple[float, int] | None: """Return (per_gpu_avg_power_w, num_gpus) for samples in [start, end]. - Returns None if the CSV is missing, empty, has no detectable power column, - or no rows fall in the window. + Accepts either a single Path (single-node case) or an iterable of Paths + (multinode case: one CSV per worker node, all written by srt-slurm's + perfmon). For multi-path inputs, GPU indices are namespaced by source + CSV stem so the distinct-id count reflects the true total — each node + independently reports indices 0..N, and without namespacing the union + would collapse to a single node's worth. + + Returns None if no CSVs are usable, none have a detectable power column, + or no rows fall in the window across all paths. """ - if not csv_path.is_file() or csv_path.stat().st_size == 0: - return None - if end_unix <= start_unix: + paths = [csv_path] if isinstance(csv_path, Path) else list(csv_path) + if not paths or end_unix <= start_unix: return None - try: - with csv_path.open("r", newline="", encoding="utf-8", errors="replace") as f: - reader = csv.DictReader(f, skipinitialspace=True) - header = [c.strip() for c in (reader.fieldnames or [])] - reader.fieldnames = header - timestamp_col, power_col, gpu_col = _detect_columns(header) - if not timestamp_col or not power_col: - return None - - # Group power readings by sample timestamp so per-sample total power - # (sum across GPUs) is computed correctly even if rows are interleaved. - # - # per_sample_row_count is the structural divisor: it's incremented for - # every contributing row regardless of whether a GPU-index column was - # detected. per_sample_gpus / gpu_keys are only populated when gpu_col - # is present and provide the canonical num_gpus via distinct-id count. - # When gpu_col is absent (vendor schema variant whose header doesn't - # match _GPU_INDEX_COL_RE), we fall back to inferring num_gpus from - # the modal row count per timestamp — assuming one row per GPU per - # sample, which is what every SMI tool we've seen actually emits. - per_sample_total: dict[float, float] = {} - per_sample_row_count: dict[float, int] = {} - per_sample_gpus: dict[float, set[str]] = {} - gpu_keys: set[str] = set() - - for row in reader: - ts_raw = (row.get(timestamp_col) or "").strip() - pw_raw = (row.get(power_col) or "").strip() - ts = _parse_timestamp(ts_raw) - pw = _parse_power(pw_raw) - if ts is None or pw is None: - continue - if ts < start_unix or ts > end_unix: + # Only namespace when there are multiple sources — keeps single-node + # gpu_keys identical to the pre-multinode behavior so existing callers + # see the same num_gpus values. + namespace = len(paths) > 1 + + # Per-sample state accumulates across ALL paths. Bucketed by ms-rounded + # timestamp so nodes whose clocks drift sub-ms still end up in the same + # bucket (they reliably do — all sample on `time.sleep(interval)` against + # the same NTP-synced cluster clock). + per_sample_total: dict[float, float] = {} + per_sample_row_count: dict[float, int] = {} + per_sample_gpus: dict[float, set[str]] = {} + gpu_keys: set[str] = set() + saw_gpu_col = False + + for path in paths: + if not path.is_file() or path.stat().st_size == 0: + continue + try: + with path.open("r", newline="", encoding="utf-8", errors="replace") as f: + reader = csv.DictReader(f, skipinitialspace=True) + header = [c.strip() for c in (reader.fieldnames or [])] + reader.fieldnames = header + timestamp_col, power_col, gpu_col = _detect_columns(header) + if not timestamp_col or not power_col: continue - # Bucket by sample timestamp (rounded to ms to absorb sub-ms drift). - bucket = round(ts, 3) - per_sample_total[bucket] = per_sample_total.get(bucket, 0.0) + pw - per_sample_row_count[bucket] = per_sample_row_count.get(bucket, 0) + 1 if gpu_col: - gpu_id = (row.get(gpu_col) or "").strip() - if gpu_id: - per_sample_gpus.setdefault(bucket, set()).add(gpu_id) - gpu_keys.add(gpu_id) - except (OSError, csv.Error): - return None + saw_gpu_col = True + + for row in reader: + ts_raw = (row.get(timestamp_col) or "").strip() + pw_raw = (row.get(power_col) or "").strip() + ts = _parse_timestamp(ts_raw) + pw = _parse_power(pw_raw) + if ts is None or pw is None: + continue + if ts < start_unix or ts > end_unix: + continue + bucket = round(ts, 3) + per_sample_total[bucket] = per_sample_total.get(bucket, 0.0) + pw + per_sample_row_count[bucket] = per_sample_row_count.get(bucket, 0) + 1 + if gpu_col: + gpu_id = (row.get(gpu_col) or "").strip() + if gpu_id: + ns_id = f"{path.stem}:{gpu_id}" if namespace else gpu_id + per_sample_gpus.setdefault(bucket, set()).add(ns_id) + gpu_keys.add(ns_id) + except (OSError, csv.Error): + continue if not per_sample_total: return None # Per-sample divisor and overall num_gpus. - # - If a GPU column was detected, trust distinct GPU IDs (correct for any - # sampling pattern, including hot-swap or partial visibility). - # - Otherwise, infer from row count (one row per GPU per sample). - if gpu_col and gpu_keys: + # - If any path exposed a GPU column, trust distinct (namespaced) GPU IDs. + # - Otherwise, infer from row count (one row per GPU per sample, summed + # across all paths' rows that fell into the same timestamp bucket). + if saw_gpu_col and gpu_keys: num_gpus = len(gpu_keys) per_sample_mean_per_gpu = [ total / max(len(per_sample_gpus.get(ts, ())), 1) @@ -190,11 +230,84 @@ def aggregate_power( return mean(per_sample_mean_per_gpu), num_gpus +def aggregate_power_per_worker( + csv_path: Path | Iterable[Path], + start_unix: float, + end_unix: float, +) -> dict | None: + """Aggregate measured power both cluster-wide and per worker. + + Returns a dict with: + + - cluster_avg_power_w: same number as aggregate_power's first tuple element + - cluster_num_gpus: same as aggregate_power's second tuple element + - workers: list of {role, worker_idx, num_gpus, avg_power_w} + dicts — one per (role, worker_idx) group derived + from CSV filenames. Empty list when no filenames + match the labeled format (single-node single-CSV + input, or older perfmon writing unlabeled paths). + + Worker grouping is by filename: each path's role + worker_idx are parsed + from ``perf_samples__w_.csv``. Multiple CSVs sharing the + same (role, worker_idx) — e.g. a multi-node TP=16 worker spanning 4 nodes — + aggregate together as one worker. Unlabeled paths are silently dropped + from the per-worker output but still contribute to the cluster-wide + average via the underlying aggregate_power call. + + Returns None when the cluster-wide aggregation returns None. + """ + paths = [csv_path] if isinstance(csv_path, Path) else list(csv_path) + if not paths or end_unix <= start_unix: + return None + + cluster = aggregate_power(paths, start_unix, end_unix) + if cluster is None: + return None + cluster_avg, cluster_n = cluster + + # Group paths by (role, worker_idx); silently skip files whose names + # don't match the labeled format. + groups: dict[tuple[str, int], list[Path]] = {} + for p in paths: + role, idx = _parse_role_from_filename(p) + if role is None or idx is None: + continue + groups.setdefault((role, idx), []).append(p) + + workers: list[dict] = [] + for (role, idx), group_paths in sorted(groups.items()): + result = aggregate_power(group_paths, start_unix, end_unix) + if result is None: + continue + avg, n = result + workers.append({ + "role": role, + "worker_idx": idx, + "num_gpus": n, + "avg_power_w": round(avg, 3), + }) + + return { + "cluster_avg_power_w": cluster_avg, + "cluster_num_gpus": cluster_n, + "workers": workers, + } + + def _load_bench_window( bench_result_path: Path, ) -> tuple[float, float, float, int, int] | None: """Read (start_unix, end_unix, duration_s, total_output_tokens, total_input_tokens) - from the raw bench JSON. Returns None if any required field is missing. + from the raw bench JSON. Returns None if a window cannot be resolved. + + Window resolution order, tried in turn: + 1. benchmark_start_time_unix + benchmark_end_time_unix (our benchmark_serving.py + writes both — single-node, brackets the actual load window exactly). + 2. date + duration (srt-slurm sa-bench writes "YYYYMMDD-HHMMSS" UTC as the + result write time — multinode; treat as bench end and subtract duration + for start. Overshoots by post-bench JSON serialization, typically <5s). + 3. file mtime + duration (last resort if `date` is absent or unparseable — + same end-of-bench proxy as #2 via the result file's mtime). total_input_tokens defaults to 0 if absent (older bench JSONs may not have it); this only degrades joules_per_total_token to equal joules_per_output_token in @@ -204,18 +317,52 @@ def _load_bench_window( bench = json.loads(bench_result_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return None - start = bench.get("benchmark_start_time_unix") - end = bench.get("benchmark_end_time_unix") duration = bench.get("duration") total_output = bench.get("total_output_tokens") total_input = bench.get("total_input_tokens", 0) - if not all(isinstance(v, (int, float)) for v in (start, end, duration)): + if not isinstance(duration, (int, float)): return None if not isinstance(total_output, int) or total_output <= 0: return None if not isinstance(total_input, int) or total_input < 0: total_input = 0 - return float(start), float(end), float(duration), int(total_output), int(total_input) + + # Tier 1: explicit Unix timestamps (single-node bench_serving.py). + start = bench.get("benchmark_start_time_unix") + end = bench.get("benchmark_end_time_unix") + if isinstance(start, (int, float)) and isinstance(end, (int, float)): + return float(start), float(end), float(duration), int(total_output), int(total_input) + + # Tier 2: parse `date` field (srt-slurm sa-bench multinode). On observed + # runs the string matches file mtime to the second, confirming it's the + # JSON write time. + date_str = bench.get("date") + if isinstance(date_str, str): + try: + end_dt = datetime.strptime(date_str, "%Y%m%d-%H%M%S").replace(tzinfo=timezone.utc) + end_unix = end_dt.timestamp() + return ( + float(end_unix - duration), + float(end_unix), + float(duration), + int(total_output), + int(total_input), + ) + except ValueError: + pass + + # Tier 3: file mtime as last-resort bench-end proxy. + try: + end_unix = bench_result_path.stat().st_mtime + except OSError: + return None + return ( + float(end_unix - duration), + float(end_unix), + float(duration), + int(total_output), + int(total_input), + ) def patch_agg_result( @@ -223,18 +370,27 @@ def patch_agg_result( avg_power_w: float, joules_per_output_token: float, joules_per_total_token: float, + extras: dict | None = None, ) -> None: - """Read the agg JSON, add the three power keys, and write it back atomically.""" + """Read the agg JSON, add the three base power keys + any extras, write back atomically. + + ``extras`` is merged after the base three keys. Used for per-worker + breakdowns (``workers``) and role-split energy metrics + (``joules_per_input_token``, ``joules_per_output_token_decode``, + ``prefill_avg_power_w``, ``decode_avg_power_w``) on disagg runs. + """ data = json.loads(agg_path.read_text(encoding="utf-8")) data["avg_power_w"] = round(avg_power_w, 3) data["joules_per_output_token"] = round(joules_per_output_token, 6) data["joules_per_total_token"] = round(joules_per_total_token, 6) + if extras: + data.update(extras) tmp_path = agg_path.with_suffix(agg_path.suffix + ".tmp") tmp_path.write_text(json.dumps(data, indent=2), encoding="utf-8") tmp_path.replace(agg_path) -def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int: +def run(csv_path: Path | Iterable[Path], bench_result: Path, agg_result: Path) -> int: window = _load_bench_window(bench_result) if window is None: print( @@ -244,19 +400,21 @@ def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int: return 0 start, end, duration, total_output, total_input = window - result = aggregate_power(csv_path, start, end) + paths = [csv_path] if isinstance(csv_path, Path) else list(csv_path) + result = aggregate_power_per_worker(paths, start, end) if result is None: + label = str(paths[0]) if len(paths) == 1 else f"{len(paths)} CSVs" print( - f"[aggregate_power] No usable power samples in {csv_path} for " + f"[aggregate_power] No usable power samples in {label} for " f"window [{start}, {end}] — skipping", file=sys.stderr, ) return 0 - avg_power_w, num_gpus = result + avg_power_w = result["cluster_avg_power_w"] + num_gpus = result["cluster_num_gpus"] + workers = result["workers"] - # Joules consumed by the system during the bench window, divided by either - # output tokens (for generation-cost metrics) or all tokens (for whole- - # workload efficiency). + # Cluster-wide energy and per-token metrics (existing behavior, unchanged). total_system_energy_j = avg_power_w * num_gpus * duration joules_per_output_token = total_system_energy_j / total_output total_tokens = total_output + total_input @@ -264,6 +422,36 @@ def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int: total_system_energy_j / total_tokens if total_tokens > 0 else joules_per_output_token ) + # Per-role breakdown — only emitted when filenames had role labels (i.e. + # srt-slurm's perfmon was the source). Role-split energy is only meaningful + # for disagg runs (both prefill and decode workers present); aggregated + # runs and frontend-only nodes don't contribute to the role-split fields. + extras: dict = {} + if workers: + extras["workers"] = workers + prefill = [w for w in workers if w["role"] == "prefill"] + decode = [w for w in workers if w["role"] == "decode"] + if prefill and decode: + prefill_gpus = sum(w["num_gpus"] for w in prefill) + decode_gpus = sum(w["num_gpus"] for w in decode) + prefill_energy_j = sum( + w["avg_power_w"] * w["num_gpus"] * duration for w in prefill + ) + decode_energy_j = sum( + w["avg_power_w"] * w["num_gpus"] * duration for w in decode + ) + extras["prefill_avg_power_w"] = round( + sum(w["avg_power_w"] * w["num_gpus"] for w in prefill) / prefill_gpus, 3 + ) + extras["decode_avg_power_w"] = round( + sum(w["avg_power_w"] * w["num_gpus"] for w in decode) / decode_gpus, 3 + ) + if total_input > 0: + extras["joules_per_input_token"] = round(prefill_energy_j / total_input, 6) + extras["joules_per_output_token_decode"] = round( + decode_energy_j / total_output, 6 + ) + if not agg_result.is_file(): print( f"[aggregate_power] Agg result {agg_result} missing — cannot patch", @@ -273,29 +461,47 @@ def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int: try: patch_agg_result( - agg_result, avg_power_w, joules_per_output_token, joules_per_total_token + agg_result, + avg_power_w, + joules_per_output_token, + joules_per_total_token, + extras=extras, ) except (OSError, json.JSONDecodeError) as exc: print(f"[aggregate_power] Failed to patch {agg_result}: {exc}", file=sys.stderr) return 0 + role_summary = ( + f" prefill={extras['prefill_avg_power_w']:.0f}W decode={extras['decode_avg_power_w']:.0f}W" + if "prefill_avg_power_w" in extras and "decode_avg_power_w" in extras + else "" + ) print( - f"[aggregate_power] avg_power_w={avg_power_w:.2f} (per GPU, n={num_gpus}) " + f"[aggregate_power] avg_power_w={avg_power_w:.2f} (per GPU, n={num_gpus}){role_summary} " f"joules_per_output_token={joules_per_output_token:.4f} " f"joules_per_total_token={joules_per_total_token:.4f} " f"duration={duration:.1f}s output_tokens={total_output} input_tokens={total_input} " - f"-> {agg_result}" + f"workers={len(workers)} -> {agg_result}" ) return 0 def main() -> int: parser = argparse.ArgumentParser(description=__doc__.splitlines()[0]) - parser.add_argument( + source = parser.add_mutually_exclusive_group() + source.add_argument( "--csv", type=Path, - default=Path("/workspace/gpu_metrics.csv"), - help="Path to gpu_metrics.csv from start_gpu_monitor (default: /workspace/gpu_metrics.csv)", + default=None, + help="Single gpu_metrics.csv from start_gpu_monitor (single-node). " + "Falls back to /workspace/gpu_metrics.csv when neither --csv nor --csv-glob is set.", + ) + source.add_argument( + "--csv-glob", + type=str, + default=None, + help="Shell glob expanding to per-node perf_samples_*.csv files (multinode, " + "written by srt-slurm's perfmon). GPU indices are namespaced by source CSV stem.", ) parser.add_argument( "--bench-result", @@ -310,7 +516,17 @@ def main() -> int: help="Path to the agg_.json output of process_result.py (will be patched in place)", ) args = parser.parse_args() - return run(args.csv, args.bench_result, args.agg_result) + + if args.csv_glob: + paths = sorted(Path(p) for p in glob_module.glob(args.csv_glob)) + if not paths: + print( + f"[aggregate_power] No CSVs matched glob {args.csv_glob!r} — skipping", + file=sys.stderr, + ) + return 0 + return run(paths, args.bench_result, args.agg_result) + return run(args.csv or Path("/workspace/gpu_metrics.csv"), args.bench_result, args.agg_result) if __name__ == "__main__": diff --git a/utils/process_result.py b/utils/process_result.py index 5fb059473..0510fe023 100644 --- a/utils/process_result.py +++ b/utils/process_result.py @@ -139,20 +139,41 @@ def get_required_env_vars(required_vars): # Best-effort: patch measured power into the agg JSON. Never fails the run. try: + import glob as _glob_module from aggregate_power import run as _aggregate_power_run - _csv_candidates = [ - os.environ.get('GPU_METRICS_CSV'), - 'gpu_metrics.csv', - '/workspace/gpu_metrics.csv', - ] - _csv_path = next( - (Path(p) for p in _csv_candidates if p and Path(p).is_file()), - None, - ) - if _csv_path is not None: + # Multinode path: srt-slurm launchers set GPU_METRICS_CSV_GLOB after the job + # to a shell glob expanding to one perf_samples_.csv per worker node. + # Takes precedence over the single-CSV fallback — if the launcher set the + # glob, the run was multinode and there is no single-CSV fallback to make. + _csv_arg = None + _glob_pattern = os.environ.get('GPU_METRICS_CSV_GLOB') + if _glob_pattern: + _matched = sorted(Path(p) for p in _glob_module.glob(_glob_pattern)) + if _matched: + _csv_arg = _matched + else: + print( + f'[process_result] GPU_METRICS_CSV_GLOB={_glob_pattern!r} matched no files', + file=sys.stderr, + ) + + if _csv_arg is None: + # Single-node path: gpu_metrics.csv written by start_gpu_monitor in the + # bench container. + _csv_candidates = [ + os.environ.get('GPU_METRICS_CSV'), + 'gpu_metrics.csv', + '/workspace/gpu_metrics.csv', + ] + _csv_arg = next( + (Path(p) for p in _csv_candidates if p and Path(p).is_file()), + None, + ) + + if _csv_arg is not None: _aggregate_power_run( - csv_path=_csv_path, + csv_path=_csv_arg, bench_result=Path(f'{result_filename}.json'), agg_result=agg_path, ) diff --git a/utils/test_aggregate_power.py b/utils/test_aggregate_power.py index bf81ee7b1..c57e55d9d 100644 --- a/utils/test_aggregate_power.py +++ b/utils/test_aggregate_power.py @@ -15,7 +15,7 @@ import json import sys -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path import pytest @@ -25,8 +25,10 @@ from aggregate_power import ( # noqa: E402 _detect_columns, _parse_power, + _parse_role_from_filename, _parse_timestamp, aggregate_power, + aggregate_power_per_worker, patch_agg_result, run, ) @@ -445,3 +447,432 @@ def test_patch_agg_result_is_atomic_via_tempfile(tmp_path: Path): assert data["joules_per_total_token"] == 0.5 # No .tmp leftover. assert not (tmp_path / "agg.json.tmp").exists() + + +# --------------------------------------------------------------------------- # +# Multi-node CSV aggregation +# --------------------------------------------------------------------------- # + + +def test_aggregate_power_multi_node_namespaces_local_gpu_indices(tmp_path: Path): + """Two per-node CSVs each report local GPU indices 0..3. + + Without per-source namespacing the union of gpu_keys would collapse to 4 + instead of 8 — the bug this whole multinode change exists to prevent.""" + base = 1_700_000_000.0 + node1 = tmp_path / "perf_samples_node1.csv" + node2 = tmp_path / "perf_samples_node2.csv" + _write_nvidia_csv(node1, [(base + s, gpu, 500.0) for s in range(3) for gpu in range(4)]) + _write_nvidia_csv(node2, [(base + s, gpu, 500.0) for s in range(3) for gpu in range(4)]) + + result = aggregate_power([node1, node2], base, base + 10) + assert result is not None + avg_power, num_gpus = result + assert avg_power == pytest.approx(500.0) + assert num_gpus == 8 + + +def test_aggregate_power_multi_node_with_sub_second_clock_drift(tmp_path: Path): + """Per-node polls drift sub-second even on NTP-synced clusters. + + Node1 polls at base+s, node2 at base+s+0.3 — rows land in different ms + buckets. Each bucket is then a single-node 4-GPU slice averaging to 500W, + and the mean across all buckets is the cluster per-GPU mean.""" + base = 1_700_000_000.0 + node1 = tmp_path / "perf_samples_node1.csv" + node2 = tmp_path / "perf_samples_node2.csv" + _write_nvidia_csv(node1, [(base + s, gpu, 500.0) for s in range(3) for gpu in range(4)]) + _write_nvidia_csv(node2, [(base + s + 0.3, gpu, 500.0) for s in range(3) for gpu in range(4)]) + + result = aggregate_power([node1, node2], base, base + 10) + assert result is not None + avg_power, num_gpus = result + assert avg_power == pytest.approx(500.0) + assert num_gpus == 8 + + +def test_aggregate_power_multi_node_asymmetric_prefill_decode_power(tmp_path: Path): + """Disagg topologies draw different per-GPU power on prefill vs decode nodes. + + 4 prefill GPUs at 600W + 4 decode GPUs at 400W: cluster mean is the + weighted average across all 8 GPUs = (4*600 + 4*400)/8 = 500W.""" + base = 1_700_000_000.0 + prefill = tmp_path / "perf_samples_prefill0.csv" + decode = tmp_path / "perf_samples_decode0.csv" + _write_nvidia_csv(prefill, [(base + s, gpu, 600.0) for s in range(3) for gpu in range(4)]) + _write_nvidia_csv(decode, [(base + s, gpu, 400.0) for s in range(3) for gpu in range(4)]) + + result = aggregate_power([prefill, decode], base, base + 10) + assert result is not None + avg_power, num_gpus = result + assert avg_power == pytest.approx(500.0) + assert num_gpus == 8 + + +def test_aggregate_power_multi_node_skips_missing_csv_silently(tmp_path: Path): + """If a node failed to start perfmon, its CSV will be absent. + + Aggregating over the remaining nodes is preferable to returning None — + losing one node's power data should not zero out the whole metric.""" + base = 1_700_000_000.0 + present = tmp_path / "perf_samples_node1.csv" + missing = tmp_path / "perf_samples_node2.csv" # never written + _write_nvidia_csv(present, [(base + s, gpu, 500.0) for s in range(3) for gpu in range(4)]) + + result = aggregate_power([present, missing], base, base + 10) + assert result is not None + avg_power, num_gpus = result + assert avg_power == pytest.approx(500.0) + assert num_gpus == 4 # only the node that emitted data + + +def test_aggregate_power_single_path_in_list_matches_bare_path(tmp_path: Path): + """Backward compat: aggregate_power([csv], ...) == aggregate_power(csv, ...). + + Single-source behavior must not change when the caller wraps the path in a + list — otherwise process_result.py-style callers that defensively normalize + to a list would see different num_gpus values than legacy bare-path calls.""" + base = 1_700_000_000.0 + csv = tmp_path / "gpu_metrics.csv" + _write_nvidia_csv(csv, [(base + s, gpu, 500.0) for s in range(3) for gpu in range(8)]) + + bare = aggregate_power(csv, base, base + 10) + listed = aggregate_power([csv], base, base + 10) + assert bare == listed + assert bare == (pytest.approx(500.0), 8) + + +def test_aggregate_power_accepts_iterable_not_just_list(tmp_path: Path): + """Signature is Iterable[Path] — generators (e.g. Path.glob()) must work.""" + base = 1_700_000_000.0 + node1 = tmp_path / "perf_samples_node1.csv" + node2 = tmp_path / "perf_samples_node2.csv" + _write_nvidia_csv(node1, [(base + s, gpu, 500.0) for s in range(2) for gpu in range(4)]) + _write_nvidia_csv(node2, [(base + s, gpu, 500.0) for s in range(2) for gpu in range(4)]) + + result = aggregate_power(tmp_path.glob("perf_samples_*.csv"), base, base + 10) + assert result is not None + _, num_gpus = result + assert num_gpus == 8 + + +def test_run_multi_node_e2e_computes_joules_from_total_gpus(tmp_path: Path): + """End-to-end multinode: run() with a list of CSVs patches the agg JSON. + + 8 GPUs total at 500W for 10s → 40_000 J → 2.0 J/output_token for 20_000 tokens.""" + base = 1_700_000_000.0 + node1 = tmp_path / "perf_samples_node1.csv" + node2 = tmp_path / "perf_samples_node2.csv" + _write_nvidia_csv(node1, [(base + 1 + s, gpu, 500.0) for s in range(2) for gpu in range(4)]) + _write_nvidia_csv(node2, [(base + 1 + s, gpu, 500.0) for s in range(2) for gpu in range(4)]) + bench = tmp_path / "bench.json" + agg = tmp_path / "agg.json" + _write_bench_result(bench, start=base, end=base + 10, duration=10.0, total_output=20_000) + agg.write_text(json.dumps({"hw": "gb300", "conc": 8192}), encoding="utf-8") + + exit_code = run([node1, node2], bench, agg) + assert exit_code == 0 + + patched = json.loads(agg.read_text()) + assert patched["avg_power_w"] == pytest.approx(500.0) + assert patched["joules_per_output_token"] == pytest.approx(2.0) + + +def test_run_multi_node_skips_when_all_csvs_missing(tmp_path: Path): + """Entire monitoring failure (all per-node CSVs absent) skips cleanly without patching.""" + bench = tmp_path / "bench.json" + agg = tmp_path / "agg.json" + _write_bench_result(bench, start=0.0, end=10.0, duration=10.0, total_output=1000) + agg.write_text(json.dumps({"hw": "gb300"}), encoding="utf-8") + + exit_code = run([tmp_path / "absent1.csv", tmp_path / "absent2.csv"], bench, agg) + assert exit_code == 0 + + patched = json.loads(agg.read_text()) + assert "avg_power_w" not in patched + + +# --------------------------------------------------------------------------- # +# _load_bench_window fallbacks for srt-slurm multinode result JSONs +# +# srt-slurm's sa-bench result writer emits `date` + `duration` but NOT the +# benchmark_*_time_unix fields our single-node benchmark_serving.py adds. +# Without a fallback, multinode runs would always hit "No bench window in +# {bench_result}" and silently skip power aggregation end-to-end. +# --------------------------------------------------------------------------- # + + +def test_run_uses_date_field_when_unix_timestamps_absent(tmp_path: Path): + """Tier 2: parse `date` ("YYYYMMDD-HHMMSS" UTC) + `duration` for the window.""" + # End of bench at a known UTC instant; CSV samples land in [end-10, end]. + end_unix = datetime(2026, 5, 20, 3, 10, 29, tzinfo=timezone.utc).timestamp() + csv = tmp_path / "perf_samples_node0.csv" + _write_nvidia_csv(csv, [(end_unix - 1 - s, gpu, 500.0) for s in range(3) for gpu in range(4)]) + + bench = tmp_path / "bench.json" + bench.write_text( + json.dumps( + { + "date": "20260520-031029", + "duration": 10.0, + "total_output_tokens": 1000, + "total_input_tokens": 8000, + } + ), + encoding="utf-8", + ) + agg = tmp_path / "agg.json" + agg.write_text(json.dumps({"hw": "gb300"}), encoding="utf-8") + + assert run([csv], bench, agg) == 0 + patched = json.loads(agg.read_text()) + assert patched["avg_power_w"] == pytest.approx(500.0) + # 4 GPUs × 500W × 10s = 20_000 J / 1000 output tokens = 20.0 J/output_token. + assert patched["joules_per_output_token"] == pytest.approx(20.0) + # 20_000 J / (1000 + 8000) total tokens ≈ 2.222 J/total_token. + assert patched["joules_per_total_token"] == pytest.approx(20_000 / 9_000) + + +def test_run_uses_mtime_when_date_unparseable(tmp_path: Path): + """Tier 3a: malformed `date` falls through to file mtime as bench-end proxy.""" + csv = tmp_path / "perf_samples_node0.csv" + bench = tmp_path / "bench.json" + bench.write_text( + json.dumps({"date": "not-a-date", "duration": 10.0, "total_output_tokens": 1000}), + encoding="utf-8", + ) + # CSV samples bracket bench file's mtime so they fall inside the derived window. + end_unix = bench.stat().st_mtime + _write_nvidia_csv(csv, [(end_unix - 1 - s, gpu, 500.0) for s in range(3) for gpu in range(4)]) + + agg = tmp_path / "agg.json" + agg.write_text(json.dumps({"hw": "gb300"}), encoding="utf-8") + assert run([csv], bench, agg) == 0 + patched = json.loads(agg.read_text()) + assert patched["avg_power_w"] == pytest.approx(500.0) + + +def test_run_uses_mtime_when_no_date_field(tmp_path: Path): + """Tier 3b: bench JSON has only `duration` → file mtime is end-of-bench.""" + csv = tmp_path / "perf_samples_node0.csv" + bench = tmp_path / "bench.json" + bench.write_text( + json.dumps({"duration": 10.0, "total_output_tokens": 1000}), + encoding="utf-8", + ) + end_unix = bench.stat().st_mtime + _write_nvidia_csv(csv, [(end_unix - 1 - s, gpu, 500.0) for s in range(3) for gpu in range(4)]) + + agg = tmp_path / "agg.json" + agg.write_text(json.dumps({"hw": "gb300"}), encoding="utf-8") + assert run([csv], bench, agg) == 0 + patched = json.loads(agg.read_text()) + assert patched["avg_power_w"] == pytest.approx(500.0) + + +def test_run_skips_when_duration_missing(tmp_path: Path): + """No tier can resolve a window without `duration` — skip cleanly.""" + csv = tmp_path / "perf_samples_node0.csv" + _write_nvidia_csv(csv, [(1_700_000_000.0, 0, 400.0)]) + bench = tmp_path / "bench.json" + bench.write_text(json.dumps({"total_output_tokens": 1000}), encoding="utf-8") + agg = tmp_path / "agg.json" + agg.write_text(json.dumps({"hw": "gb300"}), encoding="utf-8") + + assert run([csv], bench, agg) == 0 + assert "avg_power_w" not in json.loads(agg.read_text()) + + +# --------------------------------------------------------------------------- # +# Per-worker aggregation (prefill/decode role labels from filename) +# --------------------------------------------------------------------------- # + + +def test_parse_role_from_filename_labeled(tmp_path: Path): + role, idx = _parse_role_from_filename( + tmp_path / "perf_samples_prefill_w0_slurm-gb300-133-181.csv" + ) + assert role == "prefill" + assert idx == 0 + + +def test_parse_role_from_filename_decode_multidigit_idx(tmp_path: Path): + role, idx = _parse_role_from_filename(tmp_path / "perf_samples_decode_w12_node-xyz.csv") + assert role == "decode" + assert idx == 12 + + +def test_parse_role_from_filename_old_format_returns_none(tmp_path: Path): + """Backward compat: unlabeled filenames return (None, None) so callers + fall through to cluster-wide-only aggregation.""" + assert _parse_role_from_filename(tmp_path / "gpu_metrics.csv") == (None, None) + assert _parse_role_from_filename(tmp_path / "perf_samples_node1.csv") == (None, None) + + +def test_aggregate_power_per_worker_groups_disagg_topology(tmp_path: Path): + """2 prefill workers (single-node, 4 GPUs each) + 1 decode worker (4 nodes + × 4 GPUs = 16 GPUs total). Verifies workers list correctly attributes + GPUs and power to each (role, worker_idx), including multi-node aggregation + for the decode worker.""" + base = 1_700_000_000.0 + for w_idx, host in enumerate(["host1", "host2"]): + _write_nvidia_csv( + tmp_path / f"perf_samples_prefill_w{w_idx}_{host}.csv", + [(base + s, gpu, 600.0) for s in range(3) for gpu in range(4)], + ) + for host in ["host3", "host4", "host5", "host6"]: + _write_nvidia_csv( + tmp_path / f"perf_samples_decode_w0_{host}.csv", + [(base + s, gpu, 400.0) for s in range(3) for gpu in range(4)], + ) + + csvs = sorted(tmp_path.glob("perf_samples_*.csv")) + result = aggregate_power_per_worker(csvs, base, base + 10) + assert result is not None + # Cluster: 8 prefill + 16 decode = 24 GPUs; weighted avg = (8*600+16*400)/24 ≈ 466.67 + assert result["cluster_num_gpus"] == 24 + assert result["cluster_avg_power_w"] == pytest.approx(466.667, abs=0.5) + + by_key = {(w["role"], w["worker_idx"]): w for w in result["workers"]} + assert len(by_key) == 3 + assert by_key[("prefill", 0)]["num_gpus"] == 4 + assert by_key[("prefill", 0)]["avg_power_w"] == pytest.approx(600.0) + assert by_key[("prefill", 1)]["num_gpus"] == 4 + assert by_key[("decode", 0)]["num_gpus"] == 16 # multi-node decode worker + assert by_key[("decode", 0)]["avg_power_w"] == pytest.approx(400.0) + + +def test_aggregate_power_per_worker_unlabeled_files_empty_workers(tmp_path: Path): + """Old single-CSV format: cluster aggregation works, workers list is empty.""" + base = 1_700_000_000.0 + _write_nvidia_csv( + tmp_path / "gpu_metrics.csv", + [(base + s, gpu, 500.0) for s in range(3) for gpu in range(8)], + ) + + result = aggregate_power_per_worker(tmp_path / "gpu_metrics.csv", base, base + 10) + assert result is not None + assert result["cluster_num_gpus"] == 8 + assert result["cluster_avg_power_w"] == pytest.approx(500.0) + assert result["workers"] == [] + + +def test_aggregate_power_per_worker_mixed_labeled_and_unlabeled(tmp_path: Path): + """Cluster-wide aggregation includes everything; workers list only labels.""" + base = 1_700_000_000.0 + _write_nvidia_csv( + tmp_path / "gpu_metrics.csv", + [(base + s, gpu, 500.0) for s in range(3) for gpu in range(4)], + ) + _write_nvidia_csv( + tmp_path / "perf_samples_prefill_w0_node1.csv", + [(base + s, gpu, 600.0) for s in range(3) for gpu in range(4)], + ) + + result = aggregate_power_per_worker(sorted(tmp_path.glob("*.csv")), base, base + 10) + assert result is not None + assert result["cluster_num_gpus"] == 8 + assert len(result["workers"]) == 1 + assert result["workers"][0]["role"] == "prefill" + + +def test_run_disagg_emits_role_split_joules_and_per_worker_breakdown(tmp_path: Path): + """End-to-end disagg: agg JSON gets cluster-wide + per-role + per-worker fields. + + 1 prefill worker (4 GPUs at 600W) + 1 decode worker (4 GPUs at 400W). + Cluster: 8 GPUs avg 500W over 10s → 40_000 J. + Prefill energy: 600 * 4 * 10 = 24_000 J ; J/input = 24000/160000 = 0.15. + Decode energy: 400 * 4 * 10 = 16_000 J ; J/output_decode = 16000/20000 = 0.8. + """ + base = 1_700_000_000.0 + _write_nvidia_csv( + tmp_path / "perf_samples_prefill_w0_host1.csv", + [(base + 1 + s, gpu, 600.0) for s in range(2) for gpu in range(4)], + ) + _write_nvidia_csv( + tmp_path / "perf_samples_decode_w0_host2.csv", + [(base + 1 + s, gpu, 400.0) for s in range(2) for gpu in range(4)], + ) + bench = tmp_path / "bench.json" + agg = tmp_path / "agg.json" + _write_bench_result( + bench, start=base, end=base + 10, duration=10.0, + total_output=20_000, total_input=160_000, + ) + agg.write_text(json.dumps({"hw": "gb300"}), encoding="utf-8") + + assert run(sorted(tmp_path.glob("perf_samples_*.csv")), bench, agg) == 0 + patched = json.loads(agg.read_text()) + + # Cluster-wide fields preserved with the same semantics as the basic aggregator. + assert patched["avg_power_w"] == pytest.approx(500.0, abs=0.5) + assert patched["joules_per_output_token"] == pytest.approx(2.0, abs=0.01) + assert patched["joules_per_total_token"] == pytest.approx(40000 / 180000, abs=0.001) + + # New per-role scalars. + assert patched["prefill_avg_power_w"] == pytest.approx(600.0, abs=0.5) + assert patched["decode_avg_power_w"] == pytest.approx(400.0, abs=0.5) + + # New role-split joules. + assert patched["joules_per_input_token"] == pytest.approx(0.15, abs=0.001) + assert patched["joules_per_output_token_decode"] == pytest.approx(0.8, abs=0.001) + + # Nested workers array — frontend consumers can render prefill/decode separately. + by_role = {w["role"]: w for w in patched["workers"]} + assert by_role["prefill"]["num_gpus"] == 4 + assert by_role["prefill"]["worker_idx"] == 0 + assert by_role["decode"]["num_gpus"] == 4 + + +def test_run_aggregated_topology_omits_role_split_fields(tmp_path: Path): + """Non-disagg runs: workers list still emitted (with role='agg') but + prefill/decode-specific scalars and J/input absent because there is no + prefill workload to attribute input-token energy to.""" + base = 1_700_000_000.0 + _write_nvidia_csv( + tmp_path / "perf_samples_agg_w0_host1.csv", + [(base + 1 + s, gpu, 500.0) for s in range(2) for gpu in range(8)], + ) + bench = tmp_path / "bench.json" + agg = tmp_path / "agg.json" + _write_bench_result(bench, start=base, end=base + 10, duration=10.0, total_output=20_000) + agg.write_text(json.dumps({"hw": "h200"}), encoding="utf-8") + + assert run([tmp_path / "perf_samples_agg_w0_host1.csv"], bench, agg) == 0 + patched = json.loads(agg.read_text()) + + assert "avg_power_w" in patched + assert "joules_per_output_token" in patched + assert patched["workers"][0]["role"] == "agg" + + for k in ( + "prefill_avg_power_w", + "decode_avg_power_w", + "joules_per_input_token", + "joules_per_output_token_decode", + ): + assert k not in patched, f"{k} should not be set for non-disagg runs" + + +def test_run_old_format_omits_workers_field_entirely(tmp_path: Path): + """Bit-for-bit backward compat for legacy single-CSV callers + (single-node start_gpu_monitor path). Agg JSON gets ONLY the original + three power keys; no workers list, no role split.""" + base = 1_700_000_000.0 + _write_nvidia_csv( + tmp_path / "gpu_metrics.csv", + [(base + 1 + s, gpu, 500.0) for s in range(2) for gpu in range(8)], + ) + bench = tmp_path / "bench.json" + agg = tmp_path / "agg.json" + _write_bench_result(bench, start=base, end=base + 10, duration=10.0, total_output=20_000) + agg.write_text(json.dumps({"hw": "h200"}), encoding="utf-8") + + assert run(tmp_path / "gpu_metrics.csv", bench, agg) == 0 + patched = json.loads(agg.read_text()) + + assert patched["avg_power_w"] == pytest.approx(500.0) + assert patched["joules_per_output_token"] == pytest.approx(2.0) + for k in ("workers", "prefill_avg_power_w", "decode_avg_power_w", + "joules_per_input_token", "joules_per_output_token_decode"): + assert k not in patched, f"{k} should not appear for old-format callers" diff --git a/utils/test_process_result.py b/utils/test_process_result.py index 4037689ea..61d3b45fc 100644 --- a/utils/test_process_result.py +++ b/utils/test_process_result.py @@ -649,3 +649,108 @@ def test_missing_bench_timestamps_does_not_patch(self, tmp_path, single_node_env patched = json.loads(agg_path.read_text()) assert "avg_power_w" not in patched assert "joules_per_output_token" not in patched + + def test_multinode_csv_glob_aggregates_across_per_node_csvs(self, tmp_path, single_node_env_vars): + """Multinode wiring: srt-slurm launchers set GPU_METRICS_CSV_GLOB to a + shell glob expanding to one perf_samples_.csv per worker node. + process_result.py must expand it and hand the list to the aggregator, + which namespaces local GPU indices per source so they don't collide. + + Without this bridge the launcher would set the env var, process_result.py + would ignore it (fall back to a non-existent /workspace/gpu_metrics.csv), + and the chart would silently show no power data — the failure mode that + motivated catching this in the contract check.""" + start, end = 1_700_000_100.0, 1_700_000_160.0 # 60s bench window + # Two per-node CSVs at the same local indices 0-3. Without per-source + # namespacing the union would collapse to 4 GPUs instead of 8. + self._write_nvidia_csv( + tmp_path / "perf_samples_node1.csv", start, end, watts_per_gpu=600.0, num_gpus=4 + ) + self._write_nvidia_csv( + tmp_path / "perf_samples_node2.csv", start, end, watts_per_gpu=600.0, num_gpus=4 + ) + + benchmark_result = { + "model_id": "test-model", + "max_concurrency": 64, + "total_token_throughput": 1000.0, + "output_throughput": 500.0, + "benchmark_start_time_unix": start, + "benchmark_end_time_unix": end, + "duration": 60.0, + "total_output_tokens": 30_000, + } + env = { + **single_node_env_vars, + "GPU_METRICS_CSV_GLOB": str(tmp_path / "perf_samples_*.csv"), + } + + result = run_script(tmp_path, env, benchmark_result) + assert result.returncode == 0, f"Script failed: {result.stderr}" + + agg_path = tmp_path / "agg_benchmark_result.json" + patched = json.loads(agg_path.read_text()) + # 2 nodes × 4 GPUs = 8 total. Per-GPU mean stays at 600W. + assert patched["avg_power_w"] == pytest.approx(600.0, abs=0.5) + # 600W × 8 GPUs × 60s / 30_000 tokens = 9.6 J/tok. + # If namespacing failed we'd see ~4.8 (only 4 GPUs counted). + assert patched["joules_per_output_token"] == pytest.approx(9.6, abs=0.05) + + def test_multinode_csv_glob_takes_precedence_over_single_csv(self, tmp_path, single_node_env_vars): + """If both GLOB and single CSV are set, the glob wins. + + Reflects the ownership split: the multinode launcher sets the glob + after the job, while the single CSV env var is only meaningful for + single-node runs. If a stale single-CSV value leaks through (e.g. a + runner with persistent env), the glob should still take precedence.""" + start, end = 1_700_000_100.0, 1_700_000_160.0 + glob_csv = tmp_path / "perf_samples_node1.csv" + stale_csv = tmp_path / "stale_single.csv" + self._write_nvidia_csv(glob_csv, start, end, watts_per_gpu=600.0, num_gpus=4) + self._write_nvidia_csv(stale_csv, start, end, watts_per_gpu=100.0, num_gpus=1) + + benchmark_result = { + "model_id": "test-model", + "max_concurrency": 64, + "total_token_throughput": 1000.0, + "output_throughput": 500.0, + "benchmark_start_time_unix": start, + "benchmark_end_time_unix": end, + "duration": 60.0, + "total_output_tokens": 30_000, + } + env = { + **single_node_env_vars, + "GPU_METRICS_CSV_GLOB": str(tmp_path / "perf_samples_*.csv"), + "GPU_METRICS_CSV": str(stale_csv), + } + + result = run_script(tmp_path, env, benchmark_result) + assert result.returncode == 0, f"Script failed: {result.stderr}" + + agg_path = tmp_path / "agg_benchmark_result.json" + patched = json.loads(agg_path.read_text()) + # Glob respected → 600W (4 GPUs). Stale fallback would give 100W (1 GPU). + assert patched["avg_power_w"] == pytest.approx(600.0, abs=0.5) + + def test_multinode_csv_glob_empty_match_falls_through_silently(self, tmp_path, single_node_env_vars): + """If GPU_METRICS_CSV_GLOB is set but matches no files (perfmon failed + to start on any node), process_result.py still succeeds and writes the + agg JSON without power fields. The run must not block on telemetry.""" + benchmark_result = { + "model_id": "test-model", + "max_concurrency": 64, + "total_token_throughput": 1000.0, + "output_throughput": 500.0, + } + env = { + **single_node_env_vars, + "GPU_METRICS_CSV_GLOB": str(tmp_path / "perf_samples_*.csv"), + } + + result = run_script(tmp_path, env, benchmark_result) + assert result.returncode == 0, f"Script failed: {result.stderr}" + + agg_path = tmp_path / "agg_benchmark_result.json" + patched = json.loads(agg_path.read_text()) + assert "avg_power_w" not in patched