From fa44c1961929ea61cba9b469a838173171d3c485 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Mon, 16 Feb 2026 15:18:51 +0000 Subject: [PATCH 1/8] fix: remove typo in log for requeued jobs Signed-off-by: Alex Jones --- src/dvsim/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index 766778a..3a5b939 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -559,7 +559,7 @@ def _dispatch_job(self, hms: str, target: str, job_name: str) -> None: self._queued[target].append(job_name) log.verbose( - "[%s]: [%s]: [reqeued]: %s", + "[%s]: [%s]: [requeued]: %s", hms, target, job_name, From dc2b3576bacaf4cf7ea9e03691c07dc64eb6c6d2 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Mon, 16 Feb 2026 14:56:25 +0000 Subject: [PATCH 2/8] ci: disable RUF100 unused-noqa check in CI Signed-off-by: Alex Jones --- ruff-ci.toml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ruff-ci.toml b/ruff-ci.toml index 9f59e10..c4f0517 100644 --- a/ruff-ci.toml +++ b/ruff-ci.toml @@ -29,6 +29,12 @@ ignore = [ "B008", # function calls in default arg value "PERF203", # Exception handling in loops + # Some noqas might be needed for compliance with full Ruff configuration, + # but not with the temporary ignores listed below in the CI configuration. + # To stop CI telling us we need to remove these noqas, temporarily disable + # the rule enforcing this in CI. + "RUF100", + # Temporary ignores just for CI until the rules pass then they can be removed # from here to prevent future regressions "ANN001", From a3e8e94f3767f1e54c2036eaea8405ec6fb1ae12 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Mon, 16 Feb 2026 15:10:52 +0000 Subject: [PATCH 3/8] feat: add scheduler instrumentation base classes Implement the base class functionality for scheduler instrumentation, where new instrumentation options can be added by subclassing the `SchedulerInstrumentation` and registering the class with the `InstrumentationFactory` registry. This allows DVSim to potentially be extended to add custom scheduler instrumentation logic. Signed-off-by: Alex Jones --- src/dvsim/instrumentation/__init__.py | 32 ++++ src/dvsim/instrumentation/base.py | 206 ++++++++++++++++++++++++++ src/dvsim/instrumentation/factory.py | 48 ++++++ src/dvsim/instrumentation/metadata.py | 67 +++++++++ 4 files changed, 353 insertions(+) create mode 100644 src/dvsim/instrumentation/__init__.py create mode 100644 src/dvsim/instrumentation/base.py create mode 100644 src/dvsim/instrumentation/factory.py create mode 100644 src/dvsim/instrumentation/metadata.py diff --git a/src/dvsim/instrumentation/__init__.py b/src/dvsim/instrumentation/__init__.py new file mode 100644 index 0000000..cdeb044 --- /dev/null +++ b/src/dvsim/instrumentation/__init__.py @@ -0,0 +1,32 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim Scheduler Instrumentation.""" + +from dvsim.instrumentation.base import ( + CompositeInstrumentation, + InstrumentationFragment, + InstrumentationFragments, + JobFragment, + NoOpInstrumentation, + SchedulerFragment, + SchedulerInstrumentation, + merge_instrumentation_report, +) +from dvsim.instrumentation.factory import InstrumentationFactory +from dvsim.instrumentation.metadata import MetadataInstrumentation, MetadataJobFragment + +__all__ = ( + "CompositeInstrumentation", + "InstrumentationFactory", + "InstrumentationFragment", + "InstrumentationFragments", + "JobFragment", + "MetadataInstrumentation", + "MetadataJobFragment", + "NoOpInstrumentation", + "SchedulerFragment", + "SchedulerInstrumentation", + "merge_instrumentation_report", +) diff --git a/src/dvsim/instrumentation/base.py b/src/dvsim/instrumentation/base.py new file mode 100644 index 0000000..6216eae --- /dev/null +++ b/src/dvsim/instrumentation/base.py @@ -0,0 +1,206 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler instrumentation base classes.""" + +from collections.abc import Collection, Iterable, Sequence +from dataclasses import asdict, dataclass +from typing import Any, TypeAlias + +from dvsim.job.data import JobSpec +from dvsim.job.status import JobStatus +from dvsim.logging import log + +__all__ = ( + "CompositeInstrumentation", + "InstrumentationFragment", + "InstrumentationFragments", + "JobFragment", + "NoOpInstrumentation", + "SchedulerFragment", + "SchedulerInstrumentation", + "merge_instrumentation_report", +) + + +@dataclass +class InstrumentationFragment: + """Base class for instrumentation reports / report fragments.""" + + def to_dict(self) -> dict[str, Any]: + """Convert the report fragment to a dictionary.""" + return asdict(self) + + +@dataclass +class SchedulerFragment(InstrumentationFragment): + """Base class for instrumentation report fragments related to the scheduler.""" + + +@dataclass +class JobFragment(InstrumentationFragment): + """Base class for instrumentation report fragments related to individual jobs.""" + + job: JobSpec + + +def merge_instrumentation_report( + scheduler_fragments: Collection[SchedulerFragment], job_fragments: Collection[JobFragment] +) -> dict[str, Any]: + """Merge multiple instrumentation report fragments into a combined dictionary. + + When using multiple instrumentation mechanisms, this combines relevant per-job and global + scheduler information into one common interface, to make the output more readable. + """ + log.info("Merging instrumentation report data...") + + # Merge information related to the scheduler + scheduler: dict[str, Any] = {} + for i, scheduler_frag in enumerate(scheduler_fragments, start=1): + log.debug( + "Merging instrumentation report scheduler data (%d/%d)", i, len(scheduler_fragments) + ) + scheduler.update(scheduler_frag.to_dict()) + + # Merge information related to specific jobs + jobs: dict[tuple[str, str], dict[str, Any]] = {} + for i, job_frag in enumerate(job_fragments, start=1): + log.debug("Merging instrumentation report job data (%d/%d)", i, len(job_fragments)) + spec = job_frag.job + # We can uniquely identify jobs from the combination of their full name & target + job_id = (spec.full_name, spec.target) + job = jobs.get(job_id) + if job is None: + job = {} + jobs[job_id] = job + job.update({k: v for k, v in job_frag.to_dict().items() if k != "job"}) + + log.info("Finished merging instrumentation report data.") + return {"scheduler": scheduler, "jobs": list(jobs.values())} + + +# Each instrumentation object can report any number of information fragments about the +# scheduler and about its jobs. +InstrumentationFragments: TypeAlias = tuple[Sequence[SchedulerFragment], Sequence[JobFragment]] + + +class SchedulerInstrumentation: + """Instrumentation for the scheduler. + + Base class for scheduler instrumentation, recording a variety of performance and + behavioural metrics for analysis. + """ + + @property + def name(self) -> str: + """The name to use to refer to this instrumentation mechanism.""" + return self.__class__.__name__ + + def start(self) -> None: + """Begin instrumentation, starting whatever is needed before the scheduler is run.""" + log.info("Starting instrumentation: %s", self.name) + self._start() + + def _start(self) -> None: + return None + + def stop(self) -> None: + """Stop instrumentation, ending any instrumentation-specific resources.""" + log.info("Stopping instrumentation: %s", self.name) + self._stop() + + def _stop(self) -> None: + return None + + def on_scheduler_start(self) -> None: + """Notify instrumentation that the scheduler has begun.""" + return + + def on_scheduler_end(self) -> None: + """Notify instrumentation that the scheduler has finished.""" + return + + def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: # noqa: ARG002 + """Notify instrumentation of a change in status for some scheduled job.""" + return + + def build_report_fragments(self) -> InstrumentationFragments | None: + """Build report fragments from the collected instrumentation information.""" + return None + + def build_report(self) -> dict[str, Any] | None: + """Build an instrumentation report dict containing collected instrumentation info.""" + fragments = self.build_report_fragments() + return None if fragments is None else merge_instrumentation_report(*fragments) + + +class NoOpInstrumentation(SchedulerInstrumentation): + """Scheduler instrumentation which just does nothing.""" + + def start(self) -> None: + """Begin instrumentation, doing nothing (not even logging).""" + + def stop(self) -> None: + """End instrumentation, doing nothing (not even logging).""" + + def build_report(self) -> dict[str, Any] | None: + """Build an instrumentation report, doing nothing (not even logging).""" + return None + + +class CompositeInstrumentation(SchedulerInstrumentation): + """Composite instrumentation for combining several instrumentations to be used at once.""" + + def __init__(self, instrumentations: Iterable[SchedulerInstrumentation]) -> None: + """Construct an instrumentation object composed of many instrumentations. + + Arguments: + instrumentations: The list of instrumentations to compose. + + """ + super().__init__() + self._instrumentations = instrumentations + + @property + def name(self) -> str: + """The name to use to refer to this composed instrumentation.""" + composed = ", ".join(inst.name for inst in self._instrumentations) + return f"CompositeInstrumentation({composed})" + + def _start(self) -> None: + for inst in self._instrumentations: + inst.start() + + def _stop(self) -> None: + for inst in self._instrumentations: + inst.stop() + + def on_scheduler_start(self) -> None: + """Notify instrumentation that the scheduler has begun.""" + for inst in self._instrumentations: + inst.on_scheduler_start() + + def on_scheduler_end(self) -> None: + """Notify instrumentation that the scheduler has finished.""" + for inst in self._instrumentations: + inst.on_scheduler_end() + + def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: + """Notify instrumentation of a change in status for some scheduled job.""" + for inst in self._instrumentations: + inst.on_job_status_change(job, status) + + def build_report_fragments(self) -> InstrumentationFragments | None: + """Build report fragments from the collected instrumentation information.""" + scheduler_fragments = [] + job_fragments = [] + + for inst in self._instrumentations: + fragments = inst.build_report_fragments() + if fragments is None: + continue + scheduler_fragments += fragments[0] + job_fragments += fragments[1] + + return (scheduler_fragments, job_fragments) diff --git a/src/dvsim/instrumentation/factory.py b/src/dvsim/instrumentation/factory.py new file mode 100644 index 0000000..0e29360 --- /dev/null +++ b/src/dvsim/instrumentation/factory.py @@ -0,0 +1,48 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler instrumentation factory.""" + +from typing import ClassVar + +from dvsim.instrumentation.base import ( + CompositeInstrumentation, + NoOpInstrumentation, + SchedulerInstrumentation, +) +from dvsim.instrumentation.metadata import MetadataInstrumentation + +__all__ = ("InstrumentationFactory",) + + +class InstrumentationFactory: + """Factory/registry for scheduler instrumentation implementations.""" + + _registry: ClassVar[dict[str, type[SchedulerInstrumentation]]] = {} + + @classmethod + def register(cls, name: str, constructor: type[SchedulerInstrumentation]) -> None: + """Register a new scheduler instrumentation type.""" + cls._registry[name] = constructor + + @classmethod + def options(cls) -> list[str]: + """Get a list of available scheduler instrumentation types.""" + return list(cls._registry.keys()) + + @classmethod + def create(cls, names: list[str] | None) -> SchedulerInstrumentation: + """Create a scheduler instrumentation of the given types. + + Arguments: + names: A list of registered instrumentation names to compose into a single + instrumentation object, or None / an empty list for no instrumentation. + + """ + if not names: + return NoOpInstrumentation() + + instances: list[SchedulerInstrumentation] = [MetadataInstrumentation()] + instances.extend([cls._registry[name]() for name in names]) + return CompositeInstrumentation(instances) diff --git a/src/dvsim/instrumentation/metadata.py b/src/dvsim/instrumentation/metadata.py new file mode 100644 index 0000000..d8dd6d5 --- /dev/null +++ b/src/dvsim/instrumentation/metadata.py @@ -0,0 +1,67 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler instrumentation metadata (to be included in the generated report).""" + +from dataclasses import dataclass + +from dvsim.instrumentation.base import ( + InstrumentationFragments, + JobFragment, + SchedulerInstrumentation, +) +from dvsim.job.data import JobSpec +from dvsim.job.status import JobStatus + +__all__ = ( + "MetadataInstrumentation", + "MetadataJobFragment", +) + + +@dataclass +class MetadataJobFragment(JobFragment): + """Instrumentation metadata for scheduled jobs, reporting the final status of the job.""" + + name: str + full_name: str + job_type: str + target: str + dependencies: list[str] + status: str + + +class MetadataInstrumentation(SchedulerInstrumentation): + """Metadata instrumentation for the scheduler. + + Collects basic metadata about jobs (job spec and status) that are useful to include as + part of the instrumentation report for analysis, regardless of other instrumentations. + """ + + def __init__(self) -> None: + """Construct a `MetadataInstrumentation`.""" + super().__init__() + self._jobs: dict[tuple[str, str], tuple[JobSpec, str]] = {} + + def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: + """Notify instrumentation of a change in status for some scheduled job.""" + status_str = status.name.capitalize() + job_id = (job.full_name, job.target) + self._jobs[job_id] = (job, status_str) + + def build_report_fragments(self) -> InstrumentationFragments | None: + """Build report fragments from the collected instrumentation information.""" + jobs = [ + MetadataJobFragment( + spec, + spec.name, + spec.full_name, + spec.job_type, + spec.target, + spec.dependencies, + status_str, + ) + for spec, status_str in self._jobs.values() + ] + return ([], jobs) From ff5520412de247a6b1fca08a3347d8463adeb5a3 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Mon, 16 Feb 2026 15:15:50 +0000 Subject: [PATCH 4/8] feat: add scheduler timing instrumentation Implement timing instrumentation for the scheduler and register it with the instrumentation registry/factory. This enables instrumentation for reporting when the scheduler itself started/ended, as well as when each job that was dispatched started/ended. Signed-off-by: Alex Jones --- src/dvsim/instrumentation/__init__.py | 8 ++ src/dvsim/instrumentation/factory.py | 5 ++ src/dvsim/instrumentation/timing.py | 109 ++++++++++++++++++++++++++ 3 files changed, 122 insertions(+) create mode 100644 src/dvsim/instrumentation/timing.py diff --git a/src/dvsim/instrumentation/__init__.py b/src/dvsim/instrumentation/__init__.py index cdeb044..4bcbbc2 100644 --- a/src/dvsim/instrumentation/__init__.py +++ b/src/dvsim/instrumentation/__init__.py @@ -16,6 +16,11 @@ ) from dvsim.instrumentation.factory import InstrumentationFactory from dvsim.instrumentation.metadata import MetadataInstrumentation, MetadataJobFragment +from dvsim.instrumentation.timing import ( + TimingInstrumentation, + TimingJobFragment, + TimingSchedulerFragment, +) __all__ = ( "CompositeInstrumentation", @@ -28,5 +33,8 @@ "NoOpInstrumentation", "SchedulerFragment", "SchedulerInstrumentation", + "TimingInstrumentation", + "TimingJobFragment", + "TimingSchedulerFragment", "merge_instrumentation_report", ) diff --git a/src/dvsim/instrumentation/factory.py b/src/dvsim/instrumentation/factory.py index 0e29360..5dc1044 100644 --- a/src/dvsim/instrumentation/factory.py +++ b/src/dvsim/instrumentation/factory.py @@ -12,6 +12,7 @@ SchedulerInstrumentation, ) from dvsim.instrumentation.metadata import MetadataInstrumentation +from dvsim.instrumentation.timing import TimingInstrumentation __all__ = ("InstrumentationFactory",) @@ -46,3 +47,7 @@ def create(cls, names: list[str] | None) -> SchedulerInstrumentation: instances: list[SchedulerInstrumentation] = [MetadataInstrumentation()] instances.extend([cls._registry[name]() for name in names]) return CompositeInstrumentation(instances) + + +# Register implemented instrumentation mechanisms +InstrumentationFactory.register("timing", TimingInstrumentation) diff --git a/src/dvsim/instrumentation/timing.py b/src/dvsim/instrumentation/timing.py new file mode 100644 index 0000000..187413c --- /dev/null +++ b/src/dvsim/instrumentation/timing.py @@ -0,0 +1,109 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler instrumentation for timing-related information.""" + +import time +from dataclasses import asdict, dataclass +from typing import Any + +from dvsim.instrumentation.base import ( + InstrumentationFragments, + JobFragment, + SchedulerFragment, + SchedulerInstrumentation, +) +from dvsim.job.data import JobSpec +from dvsim.job.status import JobStatus + +__all__ = ( + "TimingInstrumentation", + "TimingJobFragment", + "TimingSchedulerFragment", +) + + +@dataclass +class TimingSchedulerFragment(SchedulerFragment): + """Instrumented metrics about the scheduler reported by the `TimingInstrumentation`.""" + + start_time: float | None = None + end_time: float | None = None + + @property + def duration(self) -> float | None: + """The duration of the entire scheduler run.""" + if self.start_time is None or self.end_time is None: + return None + return self.end_time - self.start_time + + def to_dict(self) -> dict[str, Any]: + """Convert the scheduler metrics to a dictionary, including the `duration` property.""" + data = asdict(self) + duration = self.duration + if duration: + data["duration"] = duration + return data + + +@dataclass +class TimingJobFragment(JobFragment): + """Instrumented metrics about the scheduler reported by the `TimingInstrumentation`.""" + + start_time: float | None = None + end_time: float | None = None + + @property + def duration(self) -> float | None: + """The duration of the job.""" + if self.start_time is None or self.end_time is None: + return None + return self.end_time - self.start_time + + def to_dict(self) -> dict[str, Any]: + """Convert the job metrics to a dictionary, including the `duration` property.""" + data = asdict(self) + duration = self.duration + if duration: + data["duration"] = duration + return data + + +class TimingInstrumentation(SchedulerInstrumentation): + """Timing instrumentation for the scheduler. + + Collects information about the start time, end time and duration of the scheduler itself and + all of the jobs that it dispatches. + """ + + def __init__(self) -> None: + """Construct a `TimingInstrumentation`.""" + super().__init__() + self._scheduler = TimingSchedulerFragment() + self._jobs: dict[tuple[str, str], TimingJobFragment] = {} + + def on_scheduler_start(self) -> None: + """Notify instrumentation that the scheduler has begun.""" + self._scheduler.start_time = time.time() + + def on_scheduler_end(self) -> None: + """Notify instrumentation that the scheduler has finished.""" + self._scheduler.end_time = time.time() + + def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: + """Notify instrumentation of a change in status for some scheduled job.""" + job_id = (job.full_name, job.target) + job_info = self._jobs.get(job_id) + if job_info is None: + job_info = TimingJobFragment(job) + self._jobs[job_id] = job_info + + if job_info.start_time is None and status != JobStatus.QUEUED: + job_info.start_time = time.time() + if status.ended: + job_info.end_time = time.time() + + def build_report_fragments(self) -> InstrumentationFragments | None: + """Build report fragments from the collected instrumentation information.""" + return ([self._scheduler], list(self._jobs.values())) From 27dc21bee468edc6b4191b2bf23d418469cf067e Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Mon, 16 Feb 2026 15:20:35 +0000 Subject: [PATCH 5/8] feat: add instrumentation singleton & scheduler hooks Hook the instrumentation implementation into the scheduler. The scheduler now takes some instrumentation object as input and will start/stop to wrap the scheduler's lifetime, and will notify it of certain events (scheduler start, stop, job status change). On stopping instrumentation, the scheduler will also generate dump the generated metrics as a JSON file at a report path, if specified. In the future we may wish to modify this abstraction so that the scheduler itself does not handle the report writing (and instead either have some parent abstraction handle it, or the instrumentation itself), but the current scheduler architecture (without any significant refactoring) means that the status is not encapsulated with the job, and thus must be injected separately into the report by the scheduler. Signed-off-by: Alex Jones --- src/dvsim/instrumentation/__init__.py | 5 +++ src/dvsim/instrumentation/base.py | 18 ++++++++++ src/dvsim/instrumentation/runtime.py | 47 +++++++++++++++++++++++++++ src/dvsim/scheduler.py | 34 +++++++++++++++---- 4 files changed, 97 insertions(+), 7 deletions(-) create mode 100644 src/dvsim/instrumentation/runtime.py diff --git a/src/dvsim/instrumentation/__init__.py b/src/dvsim/instrumentation/__init__.py index 4bcbbc2..f28cfa2 100644 --- a/src/dvsim/instrumentation/__init__.py +++ b/src/dvsim/instrumentation/__init__.py @@ -16,6 +16,7 @@ ) from dvsim.instrumentation.factory import InstrumentationFactory from dvsim.instrumentation.metadata import MetadataInstrumentation, MetadataJobFragment +from dvsim.instrumentation.runtime import flush, get, set_instrumentation, set_report_path from dvsim.instrumentation.timing import ( TimingInstrumentation, TimingJobFragment, @@ -36,5 +37,9 @@ "TimingInstrumentation", "TimingJobFragment", "TimingSchedulerFragment", + "flush", + "get", "merge_instrumentation_report", + "set_instrumentation", + "set_report_path", ) diff --git a/src/dvsim/instrumentation/base.py b/src/dvsim/instrumentation/base.py index 6216eae..b6d0804 100644 --- a/src/dvsim/instrumentation/base.py +++ b/src/dvsim/instrumentation/base.py @@ -4,8 +4,10 @@ """DVSim scheduler instrumentation base classes.""" +import json from collections.abc import Collection, Iterable, Sequence from dataclasses import asdict, dataclass +from pathlib import Path from typing import Any, TypeAlias from dvsim.job.data import JobSpec @@ -131,9 +133,25 @@ def build_report_fragments(self) -> InstrumentationFragments | None: def build_report(self) -> dict[str, Any] | None: """Build an instrumentation report dict containing collected instrumentation info.""" + log.info("Building instrumentation report...") fragments = self.build_report_fragments() return None if fragments is None else merge_instrumentation_report(*fragments) + def dump_json_report(self, report_path: Path) -> None: + """Dump a given JSON instrumentation report to a specified file path.""" + report = self.build_report() + if not report: + return + log.info("Dumping JSON instrumentation report...") + if report_path.is_dir(): + raise ValueError("Metric report path cannot be a directory.") + try: + report_path.parent.mkdir(parents=True, exist_ok=True) + report_path.write_text(json.dumps(report, indent=2)) + log.info("JSON instrumentation report dumped to: %s", str(report_path)) + except (OSError, FileNotFoundError) as e: + log.error("Error writing instrumented metrics to %s: %s", str(report_path), str(e)) + class NoOpInstrumentation(SchedulerInstrumentation): """Scheduler instrumentation which just does nothing.""" diff --git a/src/dvsim/instrumentation/runtime.py b/src/dvsim/instrumentation/runtime.py new file mode 100644 index 0000000..c7d90e5 --- /dev/null +++ b/src/dvsim/instrumentation/runtime.py @@ -0,0 +1,47 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler instrumentation for timing-related information.""" + +from pathlib import Path + +from dvsim.instrumentation.base import SchedulerInstrumentation + +__all__ = ( + "flush", + "get", + "set_instrumentation", + "set_report_path", +) + + +class _Runtime: + def __init__(self) -> None: + self.instrumentation: SchedulerInstrumentation | None = None + self.report_path: Path | None = None + + +_runtime = _Runtime() + + +def set_instrumentation(instrumentation: SchedulerInstrumentation | None) -> None: + """Configure the global instrumentation singleton.""" + _runtime.instrumentation = instrumentation + + +def set_report_path(path: Path | None) -> None: + """Configure the instrumentation report path.""" + _runtime.report_path = path + + +def get() -> SchedulerInstrumentation | None: + """Get the configured global instrumentation.""" + return _runtime.instrumentation + + +def flush() -> None: + """Dump the instrumentation report as JSON to the configured report path.""" + if _runtime.instrumentation is None or not _runtime.report_path: + return + _runtime.instrumentation.dump_json_report(_runtime.report_path) diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index 3a5b939..2b66169 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -20,6 +20,8 @@ from types import FrameType from typing import TYPE_CHECKING, Any +from dvsim import instrumentation +from dvsim.instrumentation import NoOpInstrumentation from dvsim.job.data import CompletedJobStatus, JobSpec from dvsim.job.status import JobStatus from dvsim.launcher.base import Launcher, LauncherBusyError, LauncherError @@ -92,6 +94,11 @@ def __init__( interactive: launch the tools in interactive mode. """ + # Start any instrumentation + inst = instrumentation.get() + self._instrumentation = NoOpInstrumentation() if inst is None else inst + self._instrumentation.start() + self._jobs: Mapping[str, JobSpec] = {i.full_name: i for i in items} # 'scheduled[target][cfg]' is a list of JobSpec object names for the chosen @@ -195,6 +202,8 @@ def run(self) -> Sequence[CompletedJobStatus]: Returns the results (status) of all items dispatched for all targets and cfgs. """ + # Notify instrumentation that the scheduler started + self._instrumentation.on_scheduler_start() timer = Timer() # On SIGTERM or SIGINT, tell the runner to quit. @@ -243,6 +252,11 @@ def on_signal(signal_received: int, _: FrameType | None) -> None: # Cleanup the status printer. self._status_printer.exit() + # Finish instrumentation and generate the instrumentation report + self._instrumentation.on_scheduler_end() + self._instrumentation.stop() + instrumentation.flush() + # We got to the end without anything exploding. Return the results. results = [] for name, status in self.job_status.items(): @@ -502,14 +516,17 @@ def _poll(self, hms: str) -> bool: if status == JobStatus.PASSED: self._passed[target].add(job_name) + self._instrumentation.on_job_status_change(self._jobs[job_name], status) elif status == JobStatus.FAILED: self._failed[target].add(job_name) + self._instrumentation.on_job_status_change(self._jobs[job_name], status) level = log.ERROR else: # Killed, still Queued, or some error when dispatching. self._killed[target].add(job_name) + self._instrumentation.on_job_status_change(self._jobs[job_name], status.KILLED) level = log.ERROR self._running[target].pop(self._last_item_polled_idx[target]) @@ -568,6 +585,7 @@ def _dispatch_job(self, hms: str, target: str, job_name: str) -> None: self._running[target].append(job_name) self.job_status[job_name] = JobStatus.DISPATCHED + self._instrumentation.on_job_status_change(self._jobs[job_name], JobStatus.DISPATCHED) def _dispatch(self, hms: str) -> None: """Dispatch some queued items if possible. @@ -708,11 +726,12 @@ def _cancel_item(self, job_name: str, *, cancel_successors: bool = True) -> None cancel_successors: if set then cancel successors as well (True). """ - target = self._jobs[job_name].target + job = self._jobs[job_name] self.job_status[job_name] = JobStatus.KILLED - self._killed[target].add(job_name) - if job_name in self._queued[target]: - self._queued[target].remove(job_name) + self._killed[job.target].add(job_name) + self._instrumentation.on_job_status_change(job, JobStatus.KILLED) + if job_name in self._queued[job.target]: + self._queued[job.target].remove(job_name) else: self._unschedule_item(job_name) @@ -726,9 +745,10 @@ def _kill_item(self, job_name: str) -> None: job_name: name of the job to kill """ - target = self._jobs[job_name].target + job = self._jobs[job_name] self._launchers[job_name].kill() self.job_status[job_name] = JobStatus.KILLED - self._killed[target].add(job_name) - self._running[target].remove(job_name) + self._killed[job.target].add(job_name) + self._instrumentation.on_job_status_change(job, JobStatus.KILLED) + self._running[job.target].remove(job_name) self._cancel_successors(job_name) From b547267b28b97794e5d3b6a8304c8917e0459a35 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Mon, 16 Feb 2026 15:27:53 +0000 Subject: [PATCH 6/8] feat: add scheduler instrumentation CLI argument Adds the `--instrument` option to the main DVSim CLI which can be used to specify anywhere from zero (default) to multiple instrumentations to use with DVSim's scheduler. This lets users of DVSim customize the level of instrumentation that they which to use and select which information is important to measure, allowing those who need data to capture more data and those who need performance to disable all instrumentation by default. This instrumentation is constructed on the command line and passed through the flow objects to the scheduler, where the instrumentation report (where one exists) is currently written as a single metrics.json file in the `reports` directory, next to the existing generated HTML reports (where these exist). Signed-off-by: Alex Jones --- src/dvsim/cli/run.py | 13 +++++++++++++ src/dvsim/flow/base.py | 7 ++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/dvsim/cli/run.py b/src/dvsim/cli/run.py index 30113db..e146708 100644 --- a/src/dvsim/cli/run.py +++ b/src/dvsim/cli/run.py @@ -32,6 +32,7 @@ from pathlib import Path from dvsim.flow.factory import make_cfg +from dvsim.instrumentation import InstrumentationFactory, set_instrumentation from dvsim.job.deploy import RunTest from dvsim.launcher.base import Launcher from dvsim.launcher.factory import set_launcher_type @@ -755,6 +756,15 @@ def parse_args(): dvg = parser.add_argument_group("Controlling DVSim itself") + dvg.add_argument( + "--instrument", + dest="instrumentation", + nargs="+", + default=[], + choices=InstrumentationFactory.options(), + help="Enable scheduler instrumentation (can specify multiple types).", + ) + dvg.add_argument( "--print-interval", "-pi", @@ -868,6 +878,9 @@ def main() -> None: FakeLauncher.max_parallel = args.max_parallel set_launcher_type(is_local=args.local, fake=args.fake) + # Configure scheduler instrumentation + set_instrumentation(InstrumentationFactory.create(args.instrumentation)) + # Build infrastructure from hjson file and create the list of items to # be deployed. cfg = make_cfg(args.cfg, args, proj_root) diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index 7cbd59e..149fec5 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -15,6 +15,7 @@ import hjson +from dvsim import instrumentation from dvsim.flow.hjson import set_target_attribute from dvsim.job.data import CompletedJobStatus from dvsim.launcher.factory import get_launcher_cls @@ -152,9 +153,13 @@ def __init__(self, flow_cfg_file, hjson_data, args, mk_config) -> None: self._expand() # Construct the path variables after variable expansion. - self.results_dir = Path(self.scratch_base_path) / "reports" / self.rel_path + reports_dir = Path(self.scratch_base_path) / "reports" + self.results_dir = reports_dir / self.rel_path self.results_page = self.results_dir / self.results_html_name + # Configure the report path for instrumentation + instrumentation.set_report_path(reports_dir / "metrics.json") + # Run any final checks self._post_init() From db3595902c3a9525f283f19a83804668350bd6c2 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Mon, 16 Feb 2026 15:07:04 +0000 Subject: [PATCH 7/8] build: add `psutil` Python dependency For instrumentation of the scheduler, this allows us to get more detailed information about system compute resource usage, including e.g. memory (RSS, VMS, swap) and per-core CPU utilization. Signed-off-by: Alex Jones --- pyproject.toml | 1 + uv.lock | 30 ++++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index ff8b6e1..054372a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "hjson>=3.1.0", "jinja2>=3.1.6", "logzero>=1.7.0", + "psutil>=7.2.2", "pydantic>=2.9.2", "pyyaml>=6.0.2", "tabulate>=0.9.0", diff --git a/uv.lock b/uv.lock index b05cbd4..131c3d3 100644 --- a/uv.lock +++ b/uv.lock @@ -322,6 +322,7 @@ dependencies = [ { name = "hjson" }, { name = "jinja2" }, { name = "logzero" }, + { name = "psutil" }, { name = "pydantic" }, { name = "pyyaml" }, { name = "tabulate" }, @@ -400,6 +401,7 @@ requires-dist = [ { name = "ipython", marker = "extra == 'debug'", specifier = ">=8.18.1" }, { name = "jinja2", specifier = ">=3.1.6" }, { name = "logzero", specifier = ">=1.7.0" }, + { name = "psutil", specifier = ">=7.2.2" }, { name = "pydantic", specifier = ">=2.9.2" }, { name = "pyhamcrest", marker = "extra == 'test'", specifier = ">=2.1.0" }, { name = "pyright", marker = "extra == 'typing'", specifier = ">=1.1.381" }, @@ -816,6 +818,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/84/03/0d3ce49e2505ae70cf43bc5bb3033955d2fc9f932163e84dc0779cc47f48/prompt_toolkit-3.0.52-py3-none-any.whl", hash = "sha256:9aac639a3bbd33284347de5ad8d68ecc044b91a762dc39b7c21095fcd6a19955", size = 391431, upload-time = "2025-08-27T15:23:59.498Z" }, ] +[[package]] +name = "psutil" +version = "7.2.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/aa/c6/d1ddf4abb55e93cebc4f2ed8b5d6dbad109ecb8d63748dd2b20ab5e57ebe/psutil-7.2.2.tar.gz", hash = "sha256:0746f5f8d406af344fd547f1c8daa5f5c33dbc293bb8d6a16d80b4bb88f59372", size = 493740, upload-time = "2026-01-28T18:14:54.428Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/51/08/510cbdb69c25a96f4ae523f733cdc963ae654904e8db864c07585ef99875/psutil-7.2.2-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:2edccc433cbfa046b980b0df0171cd25bcaeb3a68fe9022db0979e7aa74a826b", size = 130595, upload-time = "2026-01-28T18:14:57.293Z" }, + { url = "https://files.pythonhosted.org/packages/d6/f5/97baea3fe7a5a9af7436301f85490905379b1c6f2dd51fe3ecf24b4c5fbf/psutil-7.2.2-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:e78c8603dcd9a04c7364f1a3e670cea95d51ee865e4efb3556a3a63adef958ea", size = 131082, upload-time = "2026-01-28T18:14:59.732Z" }, + { url = "https://files.pythonhosted.org/packages/37/d6/246513fbf9fa174af531f28412297dd05241d97a75911ac8febefa1a53c6/psutil-7.2.2-cp313-cp313t-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1a571f2330c966c62aeda00dd24620425d4b0cc86881c89861fbc04549e5dc63", size = 181476, upload-time = "2026-01-28T18:15:01.884Z" }, + { url = "https://files.pythonhosted.org/packages/b8/b5/9182c9af3836cca61696dabe4fd1304e17bc56cb62f17439e1154f225dd3/psutil-7.2.2-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:917e891983ca3c1887b4ef36447b1e0873e70c933afc831c6b6da078ba474312", size = 184062, upload-time = "2026-01-28T18:15:04.436Z" }, + { url = "https://files.pythonhosted.org/packages/16/ba/0756dca669f5a9300d0cbcbfae9a4c30e446dfc7440ffe43ded5724bfd93/psutil-7.2.2-cp313-cp313t-win_amd64.whl", hash = "sha256:ab486563df44c17f5173621c7b198955bd6b613fb87c71c161f827d3fb149a9b", size = 139893, upload-time = "2026-01-28T18:15:06.378Z" }, + { url = "https://files.pythonhosted.org/packages/1c/61/8fa0e26f33623b49949346de05ec1ddaad02ed8ba64af45f40a147dbfa97/psutil-7.2.2-cp313-cp313t-win_arm64.whl", hash = "sha256:ae0aefdd8796a7737eccea863f80f81e468a1e4cf14d926bd9b6f5f2d5f90ca9", size = 135589, upload-time = "2026-01-28T18:15:08.03Z" }, + { url = "https://files.pythonhosted.org/packages/81/69/ef179ab5ca24f32acc1dac0c247fd6a13b501fd5534dbae0e05a1c48b66d/psutil-7.2.2-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:eed63d3b4d62449571547b60578c5b2c4bcccc5387148db46e0c2313dad0ee00", size = 130664, upload-time = "2026-01-28T18:15:09.469Z" }, + { url = "https://files.pythonhosted.org/packages/7b/64/665248b557a236d3fa9efc378d60d95ef56dd0a490c2cd37dafc7660d4a9/psutil-7.2.2-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:7b6d09433a10592ce39b13d7be5a54fbac1d1228ed29abc880fb23df7cb694c9", size = 131087, upload-time = "2026-01-28T18:15:11.724Z" }, + { url = "https://files.pythonhosted.org/packages/d5/2e/e6782744700d6759ebce3043dcfa661fb61e2fb752b91cdeae9af12c2178/psutil-7.2.2-cp314-cp314t-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1fa4ecf83bcdf6e6c8f4449aff98eefb5d0604bf88cb883d7da3d8d2d909546a", size = 182383, upload-time = "2026-01-28T18:15:13.445Z" }, + { url = "https://files.pythonhosted.org/packages/57/49/0a41cefd10cb7505cdc04dab3eacf24c0c2cb158a998b8c7b1d27ee2c1f5/psutil-7.2.2-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e452c464a02e7dc7822a05d25db4cde564444a67e58539a00f929c51eddda0cf", size = 185210, upload-time = "2026-01-28T18:15:16.002Z" }, + { url = "https://files.pythonhosted.org/packages/dd/2c/ff9bfb544f283ba5f83ba725a3c5fec6d6b10b8f27ac1dc641c473dc390d/psutil-7.2.2-cp314-cp314t-win_amd64.whl", hash = "sha256:c7663d4e37f13e884d13994247449e9f8f574bc4655d509c3b95e9ec9e2b9dc1", size = 141228, upload-time = "2026-01-28T18:15:18.385Z" }, + { url = "https://files.pythonhosted.org/packages/f2/fc/f8d9c31db14fcec13748d373e668bc3bed94d9077dbc17fb0eebc073233c/psutil-7.2.2-cp314-cp314t-win_arm64.whl", hash = "sha256:11fe5a4f613759764e79c65cf11ebdf26e33d6dd34336f8a337aa2996d71c841", size = 136284, upload-time = "2026-01-28T18:15:19.912Z" }, + { url = "https://files.pythonhosted.org/packages/e7/36/5ee6e05c9bd427237b11b3937ad82bb8ad2752d72c6969314590dd0c2f6e/psutil-7.2.2-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:ed0cace939114f62738d808fdcecd4c869222507e266e574799e9c0faa17d486", size = 129090, upload-time = "2026-01-28T18:15:22.168Z" }, + { url = "https://files.pythonhosted.org/packages/80/c4/f5af4c1ca8c1eeb2e92ccca14ce8effdeec651d5ab6053c589b074eda6e1/psutil-7.2.2-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:1a7b04c10f32cc88ab39cbf606e117fd74721c831c98a27dc04578deb0c16979", size = 129859, upload-time = "2026-01-28T18:15:23.795Z" }, + { url = "https://files.pythonhosted.org/packages/b5/70/5d8df3b09e25bce090399cf48e452d25c935ab72dad19406c77f4e828045/psutil-7.2.2-cp36-abi3-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:076a2d2f923fd4821644f5ba89f059523da90dc9014e85f8e45a5774ca5bc6f9", size = 155560, upload-time = "2026-01-28T18:15:25.976Z" }, + { url = "https://files.pythonhosted.org/packages/63/65/37648c0c158dc222aba51c089eb3bdfa238e621674dc42d48706e639204f/psutil-7.2.2-cp36-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b0726cecd84f9474419d67252add4ac0cd9811b04d61123054b9fb6f57df6e9e", size = 156997, upload-time = "2026-01-28T18:15:27.794Z" }, + { url = "https://files.pythonhosted.org/packages/8e/13/125093eadae863ce03c6ffdbae9929430d116a246ef69866dad94da3bfbc/psutil-7.2.2-cp36-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:fd04ef36b4a6d599bbdb225dd1d3f51e00105f6d48a28f006da7f9822f2606d8", size = 148972, upload-time = "2026-01-28T18:15:29.342Z" }, + { url = "https://files.pythonhosted.org/packages/04/78/0acd37ca84ce3ddffaa92ef0f571e073faa6d8ff1f0559ab1272188ea2be/psutil-7.2.2-cp36-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:b58fabe35e80b264a4e3bb23e6b96f9e45a3df7fb7eed419ac0e5947c61e47cc", size = 148266, upload-time = "2026-01-28T18:15:31.597Z" }, + { url = "https://files.pythonhosted.org/packages/b4/90/e2159492b5426be0c1fef7acba807a03511f97c5f86b3caeda6ad92351a7/psutil-7.2.2-cp37-abi3-win_amd64.whl", hash = "sha256:eb7e81434c8d223ec4a219b5fc1c47d0417b12be7ea866e24fb5ad6e84b3d988", size = 137737, upload-time = "2026-01-28T18:15:33.849Z" }, + { url = "https://files.pythonhosted.org/packages/8c/c7/7bb2e321574b10df20cbde462a94e2b71d05f9bbda251ef27d104668306a/psutil-7.2.2-cp37-abi3-win_arm64.whl", hash = "sha256:8c233660f575a5a89e6d4cb65d9f938126312bca76d8fe087b947b3a1aaac9ee", size = 134617, upload-time = "2026-01-28T18:15:36.514Z" }, +] + [[package]] name = "ptyprocess" version = "0.7.0" From c156eae87b94d7001f742e02fb31fc3f596be68c Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Mon, 16 Feb 2026 15:33:54 +0000 Subject: [PATCH 8/8] feat: add scheduler instrumentation for measuring system resources Adds an additional instrumentation mechanism to DVSim's scheduler to and registers it with the instrumentation factory/registry to allow users to optionally enable instrumentation of system resources. This captures a variety of useful system resources including memory utilization (RSS, VMS, swap) and CPU utilisation (percentage and time, plus per-core percentage) for both the system as a whole and specifically for the DVSim / scheduler process overhead. Since there is no "per-dispatched-job" process that is transparently available to the scheduler, the resource metrics currently reported for each job are instead an aggregate of the system resource metric samples taken over the time period for which that job was running. Currently, based on existing tools and polling frequencies, this is set to poll for system resources every 0.5 seconds (2x a second). In the future it would be nice to consider how best to make this customizable (perhaps through an additional CLI argument, or through a config file option?) Signed-off-by: Alex Jones --- src/dvsim/instrumentation/__init__.py | 8 + src/dvsim/instrumentation/factory.py | 2 + src/dvsim/instrumentation/resources.py | 281 +++++++++++++++++++++++++ 3 files changed, 291 insertions(+) create mode 100644 src/dvsim/instrumentation/resources.py diff --git a/src/dvsim/instrumentation/__init__.py b/src/dvsim/instrumentation/__init__.py index f28cfa2..b88c46a 100644 --- a/src/dvsim/instrumentation/__init__.py +++ b/src/dvsim/instrumentation/__init__.py @@ -16,6 +16,11 @@ ) from dvsim.instrumentation.factory import InstrumentationFactory from dvsim.instrumentation.metadata import MetadataInstrumentation, MetadataJobFragment +from dvsim.instrumentation.resources import ( + ResourceInstrumentation, + ResourceJobFragment, + ResourceSchedulerFragment, +) from dvsim.instrumentation.runtime import flush, get, set_instrumentation, set_report_path from dvsim.instrumentation.timing import ( TimingInstrumentation, @@ -32,6 +37,9 @@ "MetadataInstrumentation", "MetadataJobFragment", "NoOpInstrumentation", + "ResourceInstrumentation", + "ResourceJobFragment", + "ResourceSchedulerFragment", "SchedulerFragment", "SchedulerInstrumentation", "TimingInstrumentation", diff --git a/src/dvsim/instrumentation/factory.py b/src/dvsim/instrumentation/factory.py index 5dc1044..7ab24a3 100644 --- a/src/dvsim/instrumentation/factory.py +++ b/src/dvsim/instrumentation/factory.py @@ -12,6 +12,7 @@ SchedulerInstrumentation, ) from dvsim.instrumentation.metadata import MetadataInstrumentation +from dvsim.instrumentation.resources import ResourceInstrumentation from dvsim.instrumentation.timing import TimingInstrumentation __all__ = ("InstrumentationFactory",) @@ -51,3 +52,4 @@ def create(cls, names: list[str] | None) -> SchedulerInstrumentation: # Register implemented instrumentation mechanisms InstrumentationFactory.register("timing", TimingInstrumentation) +InstrumentationFactory.register("resources", ResourceInstrumentation) diff --git a/src/dvsim/instrumentation/resources.py b/src/dvsim/instrumentation/resources.py new file mode 100644 index 0000000..f010f01 --- /dev/null +++ b/src/dvsim/instrumentation/resources.py @@ -0,0 +1,281 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler instrumentation for system resource usage.""" + +import os +import threading +import time +from dataclasses import dataclass +from typing import TypeAlias + +import psutil + +from dvsim.instrumentation.base import ( + InstrumentationFragments, + JobFragment, + SchedulerFragment, + SchedulerInstrumentation, +) +from dvsim.job.data import JobSpec +from dvsim.job.status import JobStatus + +__all__ = ( + "ResourceInstrumentation", + "ResourceJobFragment", + "ResourceSchedulerFragment", +) + + +@dataclass +class ResourceSchedulerFragment(SchedulerFragment): + """Instrumented metrics about the scheduler reported by the `ResourceInstrumentation`.""" + + # Scheduler / DVSim process overhead + scheduler_rss_bytes: int | None = None + scheduler_vms_bytes: int | None = None + scheduler_cpu_percent: float | None = None + scheduler_cpu_time: float | None = None + + # System-wide metrics + sys_rss_bytes: int | None = None + sys_swap_used_bytes: int | None = None + sys_cpu_percent: float | None = None + sys_cpu_per_core: list[float] | None = None + + num_resource_samples: int = 0 + + +@dataclass +class ResourceJobFragment(JobFragment): + """Instrumented metrics about jobs reported by the `ResourceInstrumentation`. + + Since we can't directly measure each deployed job, these are instead averages and system + information over the course of the job's runtime. + """ + + max_rss_bytes: int | None = None + avg_rss_bytes: float | None = None + avg_cpu_percent: float | None = None + avg_cpu_per_core: list[float] | None = None + + num_resource_samples: int = 0 + + +class JobResourceAggregate: + """Resource Instrumentation aggregation for a single deployed job. + + Tracks aggregate information over a number of samples whilst minimizing memory usage. + """ + + def __init__(self, job: JobSpec, num_cores: int | None) -> None: + """Construct an aggregate for storing sampling info for a given job specification. + + Arguments: + job: The specification of the job which is having its information aggregated. + num_cores: The number of logical CPU cores available on the host system. + + """ + self.job_spec = job + self.num_cores = num_cores + self.sample_count = 0 + self.sum_rss = 0.0 + self.max_rss = 0 + self.sum_cpu = 0.0 + if num_cores: + self.sum_cpu_per_core = [0.0] * num_cores + + def add_sample(self, rss: int, cpu: float, cpu_per_core: list[float]) -> None: + """Aggregate an additional resource sample taken during this job's active window.""" + self.sample_count += 1 + self.sum_rss += rss + self.max_rss = max(self.max_rss, rss) + self.sum_cpu += cpu + if self.num_cores: + self.sum_cpu_per_core = [ + total + n for total, n in zip(self.sum_cpu_per_core, cpu_per_core, strict=True) + ] + + def finalize(self) -> ResourceJobFragment: + """Finalize the aggregated information for a job, generating a report fragment.""" + if self.sample_count == 0: + return ResourceJobFragment(self.job_spec) + + if self.num_cores: + avg_cpu_per_core = [x / self.sample_count for x in self.sum_cpu_per_core] + else: + avg_cpu_per_core = None + + return ResourceJobFragment( + self.job_spec, + max_rss_bytes=self.max_rss, + avg_rss_bytes=self.sum_rss / self.sample_count, + avg_cpu_percent=self.sum_cpu / self.sample_count, + avg_cpu_per_core=avg_cpu_per_core, + num_resource_samples=self.sample_count, + ) + + +# Unique identifier to disambiguate a job (full_name, target) +JobId: TypeAlias = tuple[str, str] + + +class ResourceInstrumentation(SchedulerInstrumentation): + """Resource instrumentation for the scheduler. + + Collects information about the compute resources used throughout the entire duration of + the scheduler, as well as during the window within which each job is dispatched. This + includes memory usage (max & avg RSS bytes), virtual memory (VMS bytes), swap usage, CPU + time and per-core CPU utilisation. + + Since we have no access to job sub-processes, per-job instrumentation is the aggregate + of the samples that fall within that job's execution window. + """ + + def __init__(self, sample_interval: float = 0.5) -> None: + """Construct a resource instrumentation. + + Arguments: + sample_interval: The period (in seconds) per poll / sample produced. + + """ + self.sample_interval = sample_interval + self._running = False + self._thread: threading.Thread | None = None + self._lock = threading.Lock() + + self._scheduler_process = psutil.Process(os.getpid()) + self._num_cores = psutil.cpu_count(logical=True) + self._sample_count = 0 + + # Scheduler (DVSim process) / System Memory usage + self._scheduler_sum_rss = 0 + self._scheduler_max_rss = 0 + self._scheduler_sum_vms = 0 + self._sys_max_rss = 0 + self._sys_max_swap = 0 + + # Scheduler (DVSim process) / System CPU usage + self._scheduler_cpu_time_start = 0 + self._scheduler_cpu_time_end = 0 + self._scheduler_sum_cpu = 0 + self._sys_sum_cpu = 0 + if self._num_cores is not None: + self._sys_sum_cpu_per_core = [0] * self._num_cores + + # Job aggregate metrics + self._running_jobs: dict[JobId, JobResourceAggregate] = {} + self._finished_jobs: dict[JobId, JobResourceAggregate] = {} + + def _scheduler_cpu_time(self) -> float | int: + """Get the CPU time of the scheduler process. + + Includes user mode time an system time (kernel mode). Excludes the user & system time + of child processes and any iowait time spent for blocking I/O to complete. + """ + return sum(self._scheduler_process.cpu_times()[:2]) + + def _start(self) -> None: + """Start system-wide sampling in the background on another thread.""" + self._running = True + self._scheduler_process.cpu_percent(None) # Start measuring + self._thread = threading.Thread(target=self._sampling_loop, daemon=True) + self._thread.start() + + def _stop(self) -> None: + self._running = False + if self._thread: + self._thread.join() + + def _sampling_loop(self) -> None: + next_run_at = time.time() + while self._running: + next_run_at += self.sample_interval + + scheduler_memory_info = self._scheduler_process.memory_info() + sys_rss = psutil.virtual_memory().used + sys_cpu = psutil.cpu_percent(None) + sys_cpu_per_core = psutil.cpu_percent(percpu=True) + + # Update scheduler aggregates + self._sample_count += 1 + self._scheduler_sum_rss += scheduler_memory_info.rss + self._scheduler_sum_vms += scheduler_memory_info.vms + self._scheduler_sum_cpu += self._scheduler_process.cpu_percent(None) + self._scheduler_max_rss = max(self._scheduler_max_rss, scheduler_memory_info.rss) + + # Update system-wide metrics + self._sys_max_swap = max(self._sys_max_swap, psutil.swap_memory().used) + self._sys_max_rss = max(self._sys_max_rss, sys_rss) + self._sys_sum_cpu += sys_cpu + if self._num_cores is not None: + self._sys_sum_cpu_per_core = [ + total + n + for total, n in zip(self._sys_sum_cpu_per_core, sys_cpu_per_core, strict=True) + ] + + # Update all running job aggregates with system sample + with self._lock: + for aggregate in self._running_jobs.values(): + aggregate.add_sample(sys_rss, sys_cpu, sys_cpu_per_core) + + sleep_time = max(next_run_at - time.time(), 0) + time.sleep(sleep_time) + + def on_scheduler_start(self) -> None: + """Notify instrumentation that the scheduler has begun.""" + self._scheduler_cpu_time_start = self._scheduler_cpu_time() + + def on_scheduler_end(self) -> None: + """Notify instrumentation that the scheduler has finished.""" + self._scheduler_cpu_time_end = self._scheduler_cpu_time() + + def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: + """Notify instrumentation of a change in status for some scheduled job.""" + job_id = (job.full_name, job.target) + + with self._lock: + running = job_id in self._running_jobs + started = running or job_id in self._finished_jobs + if not started and status != JobStatus.QUEUED: + self._running_jobs[job_id] = JobResourceAggregate(job, self._num_cores) + running = True + if running and status.ended: + aggregates = self._running_jobs.pop(job_id) + self._finished_jobs[job_id] = aggregates + + def build_report_fragments(self) -> InstrumentationFragments | None: + """Build report fragments from the collected instrumentation information.""" + if self._running: + raise RuntimeError("Cannot build instrumentation report whilst still running!") + + if self._sample_count <= 0: + scheduler_frag = ResourceSchedulerFragment() + else: + scheduler_cpu_time = self._scheduler_cpu_time_end - self._scheduler_cpu_time_start + if self._num_cores is not None: + sys_cpu_per_core = [s / self._sample_count for s in self._sys_sum_cpu_per_core] + else: + sys_cpu_per_core = None + try: + vms_bytes = round(self._scheduler_sum_vms / self._sample_count) + except (ValueError, TypeError): + # Suppress unknown types in VMS measurements + vms_bytes = None + + scheduler_frag = ResourceSchedulerFragment( + scheduler_rss_bytes=self._scheduler_max_rss, + scheduler_vms_bytes=vms_bytes, + scheduler_cpu_percent=self._scheduler_sum_cpu / self._sample_count, + scheduler_cpu_time=scheduler_cpu_time, + sys_rss_bytes=self._sys_max_rss, + sys_cpu_percent=self._sys_sum_cpu / self._sample_count, + sys_cpu_per_core=sys_cpu_per_core, + sys_swap_used_bytes=self._sys_max_swap, + num_resource_samples=self._sample_count, + ) + + aggregates = list(self._finished_jobs.values()) + list(self._running_jobs.values()) + job_frags = [aggregate.finalize() for aggregate in aggregates] + return ([scheduler_frag], job_frags)