diff --git a/pyproject.toml b/pyproject.toml index ff8b6e16..054372aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "hjson>=3.1.0", "jinja2>=3.1.6", "logzero>=1.7.0", + "psutil>=7.2.2", "pydantic>=2.9.2", "pyyaml>=6.0.2", "tabulate>=0.9.0", diff --git a/ruff-ci.toml b/ruff-ci.toml index 9f59e109..c4f05178 100644 --- a/ruff-ci.toml +++ b/ruff-ci.toml @@ -29,6 +29,12 @@ ignore = [ "B008", # function calls in default arg value "PERF203", # Exception handling in loops + # Some noqas might be needed for compliance with full Ruff configuration, + # but not with the temporary ignores listed below in the CI configuration. + # To stop CI telling us we need to remove these noqas, temporarily disable + # the rule enforcing this in CI. + "RUF100", + # Temporary ignores just for CI until the rules pass then they can be removed # from here to prevent future regressions "ANN001", diff --git a/src/dvsim/cli/run.py b/src/dvsim/cli/run.py index 30113dbd..e1467084 100644 --- a/src/dvsim/cli/run.py +++ b/src/dvsim/cli/run.py @@ -32,6 +32,7 @@ from pathlib import Path from dvsim.flow.factory import make_cfg +from dvsim.instrumentation import InstrumentationFactory, set_instrumentation from dvsim.job.deploy import RunTest from dvsim.launcher.base import Launcher from dvsim.launcher.factory import set_launcher_type @@ -755,6 +756,15 @@ def parse_args(): dvg = parser.add_argument_group("Controlling DVSim itself") + dvg.add_argument( + "--instrument", + dest="instrumentation", + nargs="+", + default=[], + choices=InstrumentationFactory.options(), + help="Enable scheduler instrumentation (can specify multiple types).", + ) + dvg.add_argument( "--print-interval", "-pi", @@ -868,6 +878,9 @@ def main() -> None: FakeLauncher.max_parallel = args.max_parallel set_launcher_type(is_local=args.local, fake=args.fake) + # Configure scheduler instrumentation + set_instrumentation(InstrumentationFactory.create(args.instrumentation)) + # Build infrastructure from hjson file and create the list of items to # be deployed. cfg = make_cfg(args.cfg, args, proj_root) diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index 7cbd59e7..149fec5b 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -15,6 +15,7 @@ import hjson +from dvsim import instrumentation from dvsim.flow.hjson import set_target_attribute from dvsim.job.data import CompletedJobStatus from dvsim.launcher.factory import get_launcher_cls @@ -152,9 +153,13 @@ def __init__(self, flow_cfg_file, hjson_data, args, mk_config) -> None: self._expand() # Construct the path variables after variable expansion. - self.results_dir = Path(self.scratch_base_path) / "reports" / self.rel_path + reports_dir = Path(self.scratch_base_path) / "reports" + self.results_dir = reports_dir / self.rel_path self.results_page = self.results_dir / self.results_html_name + # Configure the report path for instrumentation + instrumentation.set_report_path(reports_dir / "metrics.json") + # Run any final checks self._post_init() diff --git a/src/dvsim/instrumentation/__init__.py b/src/dvsim/instrumentation/__init__.py new file mode 100644 index 00000000..b88c46a6 --- /dev/null +++ b/src/dvsim/instrumentation/__init__.py @@ -0,0 +1,53 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim Scheduler Instrumentation.""" + +from dvsim.instrumentation.base import ( + CompositeInstrumentation, + InstrumentationFragment, + InstrumentationFragments, + JobFragment, + NoOpInstrumentation, + SchedulerFragment, + SchedulerInstrumentation, + merge_instrumentation_report, +) +from dvsim.instrumentation.factory import InstrumentationFactory +from dvsim.instrumentation.metadata import MetadataInstrumentation, MetadataJobFragment +from dvsim.instrumentation.resources import ( + ResourceInstrumentation, + ResourceJobFragment, + ResourceSchedulerFragment, +) +from dvsim.instrumentation.runtime import flush, get, set_instrumentation, set_report_path +from dvsim.instrumentation.timing import ( + TimingInstrumentation, + TimingJobFragment, + TimingSchedulerFragment, +) + +__all__ = ( + "CompositeInstrumentation", + "InstrumentationFactory", + "InstrumentationFragment", + "InstrumentationFragments", + "JobFragment", + "MetadataInstrumentation", + "MetadataJobFragment", + "NoOpInstrumentation", + "ResourceInstrumentation", + "ResourceJobFragment", + "ResourceSchedulerFragment", + "SchedulerFragment", + "SchedulerInstrumentation", + "TimingInstrumentation", + "TimingJobFragment", + "TimingSchedulerFragment", + "flush", + "get", + "merge_instrumentation_report", + "set_instrumentation", + "set_report_path", +) diff --git a/src/dvsim/instrumentation/base.py b/src/dvsim/instrumentation/base.py new file mode 100644 index 00000000..b6d0804e --- /dev/null +++ b/src/dvsim/instrumentation/base.py @@ -0,0 +1,224 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler instrumentation base classes.""" + +import json +from collections.abc import Collection, Iterable, Sequence +from dataclasses import asdict, dataclass +from pathlib import Path +from typing import Any, TypeAlias + +from dvsim.job.data import JobSpec +from dvsim.job.status import JobStatus +from dvsim.logging import log + +__all__ = ( + "CompositeInstrumentation", + "InstrumentationFragment", + "InstrumentationFragments", + "JobFragment", + "NoOpInstrumentation", + "SchedulerFragment", + "SchedulerInstrumentation", + "merge_instrumentation_report", +) + + +@dataclass +class InstrumentationFragment: + """Base class for instrumentation reports / report fragments.""" + + def to_dict(self) -> dict[str, Any]: + """Convert the report fragment to a dictionary.""" + return asdict(self) + + +@dataclass +class SchedulerFragment(InstrumentationFragment): + """Base class for instrumentation report fragments related to the scheduler.""" + + +@dataclass +class JobFragment(InstrumentationFragment): + """Base class for instrumentation report fragments related to individual jobs.""" + + job: JobSpec + + +def merge_instrumentation_report( + scheduler_fragments: Collection[SchedulerFragment], job_fragments: Collection[JobFragment] +) -> dict[str, Any]: + """Merge multiple instrumentation report fragments into a combined dictionary. + + When using multiple instrumentation mechanisms, this combines relevant per-job and global + scheduler information into one common interface, to make the output more readable. + """ + log.info("Merging instrumentation report data...") + + # Merge information related to the scheduler + scheduler: dict[str, Any] = {} + for i, scheduler_frag in enumerate(scheduler_fragments, start=1): + log.debug( + "Merging instrumentation report scheduler data (%d/%d)", i, len(scheduler_fragments) + ) + scheduler.update(scheduler_frag.to_dict()) + + # Merge information related to specific jobs + jobs: dict[tuple[str, str], dict[str, Any]] = {} + for i, job_frag in enumerate(job_fragments, start=1): + log.debug("Merging instrumentation report job data (%d/%d)", i, len(job_fragments)) + spec = job_frag.job + # We can uniquely identify jobs from the combination of their full name & target + job_id = (spec.full_name, spec.target) + job = jobs.get(job_id) + if job is None: + job = {} + jobs[job_id] = job + job.update({k: v for k, v in job_frag.to_dict().items() if k != "job"}) + + log.info("Finished merging instrumentation report data.") + return {"scheduler": scheduler, "jobs": list(jobs.values())} + + +# Each instrumentation object can report any number of information fragments about the +# scheduler and about its jobs. +InstrumentationFragments: TypeAlias = tuple[Sequence[SchedulerFragment], Sequence[JobFragment]] + + +class SchedulerInstrumentation: + """Instrumentation for the scheduler. + + Base class for scheduler instrumentation, recording a variety of performance and + behavioural metrics for analysis. + """ + + @property + def name(self) -> str: + """The name to use to refer to this instrumentation mechanism.""" + return self.__class__.__name__ + + def start(self) -> None: + """Begin instrumentation, starting whatever is needed before the scheduler is run.""" + log.info("Starting instrumentation: %s", self.name) + self._start() + + def _start(self) -> None: + return None + + def stop(self) -> None: + """Stop instrumentation, ending any instrumentation-specific resources.""" + log.info("Stopping instrumentation: %s", self.name) + self._stop() + + def _stop(self) -> None: + return None + + def on_scheduler_start(self) -> None: + """Notify instrumentation that the scheduler has begun.""" + return + + def on_scheduler_end(self) -> None: + """Notify instrumentation that the scheduler has finished.""" + return + + def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: # noqa: ARG002 + """Notify instrumentation of a change in status for some scheduled job.""" + return + + def build_report_fragments(self) -> InstrumentationFragments | None: + """Build report fragments from the collected instrumentation information.""" + return None + + def build_report(self) -> dict[str, Any] | None: + """Build an instrumentation report dict containing collected instrumentation info.""" + log.info("Building instrumentation report...") + fragments = self.build_report_fragments() + return None if fragments is None else merge_instrumentation_report(*fragments) + + def dump_json_report(self, report_path: Path) -> None: + """Dump a given JSON instrumentation report to a specified file path.""" + report = self.build_report() + if not report: + return + log.info("Dumping JSON instrumentation report...") + if report_path.is_dir(): + raise ValueError("Metric report path cannot be a directory.") + try: + report_path.parent.mkdir(parents=True, exist_ok=True) + report_path.write_text(json.dumps(report, indent=2)) + log.info("JSON instrumentation report dumped to: %s", str(report_path)) + except (OSError, FileNotFoundError) as e: + log.error("Error writing instrumented metrics to %s: %s", str(report_path), str(e)) + + +class NoOpInstrumentation(SchedulerInstrumentation): + """Scheduler instrumentation which just does nothing.""" + + def start(self) -> None: + """Begin instrumentation, doing nothing (not even logging).""" + + def stop(self) -> None: + """End instrumentation, doing nothing (not even logging).""" + + def build_report(self) -> dict[str, Any] | None: + """Build an instrumentation report, doing nothing (not even logging).""" + return None + + +class CompositeInstrumentation(SchedulerInstrumentation): + """Composite instrumentation for combining several instrumentations to be used at once.""" + + def __init__(self, instrumentations: Iterable[SchedulerInstrumentation]) -> None: + """Construct an instrumentation object composed of many instrumentations. + + Arguments: + instrumentations: The list of instrumentations to compose. + + """ + super().__init__() + self._instrumentations = instrumentations + + @property + def name(self) -> str: + """The name to use to refer to this composed instrumentation.""" + composed = ", ".join(inst.name for inst in self._instrumentations) + return f"CompositeInstrumentation({composed})" + + def _start(self) -> None: + for inst in self._instrumentations: + inst.start() + + def _stop(self) -> None: + for inst in self._instrumentations: + inst.stop() + + def on_scheduler_start(self) -> None: + """Notify instrumentation that the scheduler has begun.""" + for inst in self._instrumentations: + inst.on_scheduler_start() + + def on_scheduler_end(self) -> None: + """Notify instrumentation that the scheduler has finished.""" + for inst in self._instrumentations: + inst.on_scheduler_end() + + def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: + """Notify instrumentation of a change in status for some scheduled job.""" + for inst in self._instrumentations: + inst.on_job_status_change(job, status) + + def build_report_fragments(self) -> InstrumentationFragments | None: + """Build report fragments from the collected instrumentation information.""" + scheduler_fragments = [] + job_fragments = [] + + for inst in self._instrumentations: + fragments = inst.build_report_fragments() + if fragments is None: + continue + scheduler_fragments += fragments[0] + job_fragments += fragments[1] + + return (scheduler_fragments, job_fragments) diff --git a/src/dvsim/instrumentation/factory.py b/src/dvsim/instrumentation/factory.py new file mode 100644 index 00000000..7ab24a32 --- /dev/null +++ b/src/dvsim/instrumentation/factory.py @@ -0,0 +1,55 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler instrumentation factory.""" + +from typing import ClassVar + +from dvsim.instrumentation.base import ( + CompositeInstrumentation, + NoOpInstrumentation, + SchedulerInstrumentation, +) +from dvsim.instrumentation.metadata import MetadataInstrumentation +from dvsim.instrumentation.resources import ResourceInstrumentation +from dvsim.instrumentation.timing import TimingInstrumentation + +__all__ = ("InstrumentationFactory",) + + +class InstrumentationFactory: + """Factory/registry for scheduler instrumentation implementations.""" + + _registry: ClassVar[dict[str, type[SchedulerInstrumentation]]] = {} + + @classmethod + def register(cls, name: str, constructor: type[SchedulerInstrumentation]) -> None: + """Register a new scheduler instrumentation type.""" + cls._registry[name] = constructor + + @classmethod + def options(cls) -> list[str]: + """Get a list of available scheduler instrumentation types.""" + return list(cls._registry.keys()) + + @classmethod + def create(cls, names: list[str] | None) -> SchedulerInstrumentation: + """Create a scheduler instrumentation of the given types. + + Arguments: + names: A list of registered instrumentation names to compose into a single + instrumentation object, or None / an empty list for no instrumentation. + + """ + if not names: + return NoOpInstrumentation() + + instances: list[SchedulerInstrumentation] = [MetadataInstrumentation()] + instances.extend([cls._registry[name]() for name in names]) + return CompositeInstrumentation(instances) + + +# Register implemented instrumentation mechanisms +InstrumentationFactory.register("timing", TimingInstrumentation) +InstrumentationFactory.register("resources", ResourceInstrumentation) diff --git a/src/dvsim/instrumentation/metadata.py b/src/dvsim/instrumentation/metadata.py new file mode 100644 index 00000000..d8dd6d51 --- /dev/null +++ b/src/dvsim/instrumentation/metadata.py @@ -0,0 +1,67 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler instrumentation metadata (to be included in the generated report).""" + +from dataclasses import dataclass + +from dvsim.instrumentation.base import ( + InstrumentationFragments, + JobFragment, + SchedulerInstrumentation, +) +from dvsim.job.data import JobSpec +from dvsim.job.status import JobStatus + +__all__ = ( + "MetadataInstrumentation", + "MetadataJobFragment", +) + + +@dataclass +class MetadataJobFragment(JobFragment): + """Instrumentation metadata for scheduled jobs, reporting the final status of the job.""" + + name: str + full_name: str + job_type: str + target: str + dependencies: list[str] + status: str + + +class MetadataInstrumentation(SchedulerInstrumentation): + """Metadata instrumentation for the scheduler. + + Collects basic metadata about jobs (job spec and status) that are useful to include as + part of the instrumentation report for analysis, regardless of other instrumentations. + """ + + def __init__(self) -> None: + """Construct a `MetadataInstrumentation`.""" + super().__init__() + self._jobs: dict[tuple[str, str], tuple[JobSpec, str]] = {} + + def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: + """Notify instrumentation of a change in status for some scheduled job.""" + status_str = status.name.capitalize() + job_id = (job.full_name, job.target) + self._jobs[job_id] = (job, status_str) + + def build_report_fragments(self) -> InstrumentationFragments | None: + """Build report fragments from the collected instrumentation information.""" + jobs = [ + MetadataJobFragment( + spec, + spec.name, + spec.full_name, + spec.job_type, + spec.target, + spec.dependencies, + status_str, + ) + for spec, status_str in self._jobs.values() + ] + return ([], jobs) diff --git a/src/dvsim/instrumentation/resources.py b/src/dvsim/instrumentation/resources.py new file mode 100644 index 00000000..f010f01a --- /dev/null +++ b/src/dvsim/instrumentation/resources.py @@ -0,0 +1,281 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler instrumentation for system resource usage.""" + +import os +import threading +import time +from dataclasses import dataclass +from typing import TypeAlias + +import psutil + +from dvsim.instrumentation.base import ( + InstrumentationFragments, + JobFragment, + SchedulerFragment, + SchedulerInstrumentation, +) +from dvsim.job.data import JobSpec +from dvsim.job.status import JobStatus + +__all__ = ( + "ResourceInstrumentation", + "ResourceJobFragment", + "ResourceSchedulerFragment", +) + + +@dataclass +class ResourceSchedulerFragment(SchedulerFragment): + """Instrumented metrics about the scheduler reported by the `ResourceInstrumentation`.""" + + # Scheduler / DVSim process overhead + scheduler_rss_bytes: int | None = None + scheduler_vms_bytes: int | None = None + scheduler_cpu_percent: float | None = None + scheduler_cpu_time: float | None = None + + # System-wide metrics + sys_rss_bytes: int | None = None + sys_swap_used_bytes: int | None = None + sys_cpu_percent: float | None = None + sys_cpu_per_core: list[float] | None = None + + num_resource_samples: int = 0 + + +@dataclass +class ResourceJobFragment(JobFragment): + """Instrumented metrics about jobs reported by the `ResourceInstrumentation`. + + Since we can't directly measure each deployed job, these are instead averages and system + information over the course of the job's runtime. + """ + + max_rss_bytes: int | None = None + avg_rss_bytes: float | None = None + avg_cpu_percent: float | None = None + avg_cpu_per_core: list[float] | None = None + + num_resource_samples: int = 0 + + +class JobResourceAggregate: + """Resource Instrumentation aggregation for a single deployed job. + + Tracks aggregate information over a number of samples whilst minimizing memory usage. + """ + + def __init__(self, job: JobSpec, num_cores: int | None) -> None: + """Construct an aggregate for storing sampling info for a given job specification. + + Arguments: + job: The specification of the job which is having its information aggregated. + num_cores: The number of logical CPU cores available on the host system. + + """ + self.job_spec = job + self.num_cores = num_cores + self.sample_count = 0 + self.sum_rss = 0.0 + self.max_rss = 0 + self.sum_cpu = 0.0 + if num_cores: + self.sum_cpu_per_core = [0.0] * num_cores + + def add_sample(self, rss: int, cpu: float, cpu_per_core: list[float]) -> None: + """Aggregate an additional resource sample taken during this job's active window.""" + self.sample_count += 1 + self.sum_rss += rss + self.max_rss = max(self.max_rss, rss) + self.sum_cpu += cpu + if self.num_cores: + self.sum_cpu_per_core = [ + total + n for total, n in zip(self.sum_cpu_per_core, cpu_per_core, strict=True) + ] + + def finalize(self) -> ResourceJobFragment: + """Finalize the aggregated information for a job, generating a report fragment.""" + if self.sample_count == 0: + return ResourceJobFragment(self.job_spec) + + if self.num_cores: + avg_cpu_per_core = [x / self.sample_count for x in self.sum_cpu_per_core] + else: + avg_cpu_per_core = None + + return ResourceJobFragment( + self.job_spec, + max_rss_bytes=self.max_rss, + avg_rss_bytes=self.sum_rss / self.sample_count, + avg_cpu_percent=self.sum_cpu / self.sample_count, + avg_cpu_per_core=avg_cpu_per_core, + num_resource_samples=self.sample_count, + ) + + +# Unique identifier to disambiguate a job (full_name, target) +JobId: TypeAlias = tuple[str, str] + + +class ResourceInstrumentation(SchedulerInstrumentation): + """Resource instrumentation for the scheduler. + + Collects information about the compute resources used throughout the entire duration of + the scheduler, as well as during the window within which each job is dispatched. This + includes memory usage (max & avg RSS bytes), virtual memory (VMS bytes), swap usage, CPU + time and per-core CPU utilisation. + + Since we have no access to job sub-processes, per-job instrumentation is the aggregate + of the samples that fall within that job's execution window. + """ + + def __init__(self, sample_interval: float = 0.5) -> None: + """Construct a resource instrumentation. + + Arguments: + sample_interval: The period (in seconds) per poll / sample produced. + + """ + self.sample_interval = sample_interval + self._running = False + self._thread: threading.Thread | None = None + self._lock = threading.Lock() + + self._scheduler_process = psutil.Process(os.getpid()) + self._num_cores = psutil.cpu_count(logical=True) + self._sample_count = 0 + + # Scheduler (DVSim process) / System Memory usage + self._scheduler_sum_rss = 0 + self._scheduler_max_rss = 0 + self._scheduler_sum_vms = 0 + self._sys_max_rss = 0 + self._sys_max_swap = 0 + + # Scheduler (DVSim process) / System CPU usage + self._scheduler_cpu_time_start = 0 + self._scheduler_cpu_time_end = 0 + self._scheduler_sum_cpu = 0 + self._sys_sum_cpu = 0 + if self._num_cores is not None: + self._sys_sum_cpu_per_core = [0] * self._num_cores + + # Job aggregate metrics + self._running_jobs: dict[JobId, JobResourceAggregate] = {} + self._finished_jobs: dict[JobId, JobResourceAggregate] = {} + + def _scheduler_cpu_time(self) -> float | int: + """Get the CPU time of the scheduler process. + + Includes user mode time an system time (kernel mode). Excludes the user & system time + of child processes and any iowait time spent for blocking I/O to complete. + """ + return sum(self._scheduler_process.cpu_times()[:2]) + + def _start(self) -> None: + """Start system-wide sampling in the background on another thread.""" + self._running = True + self._scheduler_process.cpu_percent(None) # Start measuring + self._thread = threading.Thread(target=self._sampling_loop, daemon=True) + self._thread.start() + + def _stop(self) -> None: + self._running = False + if self._thread: + self._thread.join() + + def _sampling_loop(self) -> None: + next_run_at = time.time() + while self._running: + next_run_at += self.sample_interval + + scheduler_memory_info = self._scheduler_process.memory_info() + sys_rss = psutil.virtual_memory().used + sys_cpu = psutil.cpu_percent(None) + sys_cpu_per_core = psutil.cpu_percent(percpu=True) + + # Update scheduler aggregates + self._sample_count += 1 + self._scheduler_sum_rss += scheduler_memory_info.rss + self._scheduler_sum_vms += scheduler_memory_info.vms + self._scheduler_sum_cpu += self._scheduler_process.cpu_percent(None) + self._scheduler_max_rss = max(self._scheduler_max_rss, scheduler_memory_info.rss) + + # Update system-wide metrics + self._sys_max_swap = max(self._sys_max_swap, psutil.swap_memory().used) + self._sys_max_rss = max(self._sys_max_rss, sys_rss) + self._sys_sum_cpu += sys_cpu + if self._num_cores is not None: + self._sys_sum_cpu_per_core = [ + total + n + for total, n in zip(self._sys_sum_cpu_per_core, sys_cpu_per_core, strict=True) + ] + + # Update all running job aggregates with system sample + with self._lock: + for aggregate in self._running_jobs.values(): + aggregate.add_sample(sys_rss, sys_cpu, sys_cpu_per_core) + + sleep_time = max(next_run_at - time.time(), 0) + time.sleep(sleep_time) + + def on_scheduler_start(self) -> None: + """Notify instrumentation that the scheduler has begun.""" + self._scheduler_cpu_time_start = self._scheduler_cpu_time() + + def on_scheduler_end(self) -> None: + """Notify instrumentation that the scheduler has finished.""" + self._scheduler_cpu_time_end = self._scheduler_cpu_time() + + def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: + """Notify instrumentation of a change in status for some scheduled job.""" + job_id = (job.full_name, job.target) + + with self._lock: + running = job_id in self._running_jobs + started = running or job_id in self._finished_jobs + if not started and status != JobStatus.QUEUED: + self._running_jobs[job_id] = JobResourceAggregate(job, self._num_cores) + running = True + if running and status.ended: + aggregates = self._running_jobs.pop(job_id) + self._finished_jobs[job_id] = aggregates + + def build_report_fragments(self) -> InstrumentationFragments | None: + """Build report fragments from the collected instrumentation information.""" + if self._running: + raise RuntimeError("Cannot build instrumentation report whilst still running!") + + if self._sample_count <= 0: + scheduler_frag = ResourceSchedulerFragment() + else: + scheduler_cpu_time = self._scheduler_cpu_time_end - self._scheduler_cpu_time_start + if self._num_cores is not None: + sys_cpu_per_core = [s / self._sample_count for s in self._sys_sum_cpu_per_core] + else: + sys_cpu_per_core = None + try: + vms_bytes = round(self._scheduler_sum_vms / self._sample_count) + except (ValueError, TypeError): + # Suppress unknown types in VMS measurements + vms_bytes = None + + scheduler_frag = ResourceSchedulerFragment( + scheduler_rss_bytes=self._scheduler_max_rss, + scheduler_vms_bytes=vms_bytes, + scheduler_cpu_percent=self._scheduler_sum_cpu / self._sample_count, + scheduler_cpu_time=scheduler_cpu_time, + sys_rss_bytes=self._sys_max_rss, + sys_cpu_percent=self._sys_sum_cpu / self._sample_count, + sys_cpu_per_core=sys_cpu_per_core, + sys_swap_used_bytes=self._sys_max_swap, + num_resource_samples=self._sample_count, + ) + + aggregates = list(self._finished_jobs.values()) + list(self._running_jobs.values()) + job_frags = [aggregate.finalize() for aggregate in aggregates] + return ([scheduler_frag], job_frags) diff --git a/src/dvsim/instrumentation/runtime.py b/src/dvsim/instrumentation/runtime.py new file mode 100644 index 00000000..c7d90e5d --- /dev/null +++ b/src/dvsim/instrumentation/runtime.py @@ -0,0 +1,47 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler instrumentation for timing-related information.""" + +from pathlib import Path + +from dvsim.instrumentation.base import SchedulerInstrumentation + +__all__ = ( + "flush", + "get", + "set_instrumentation", + "set_report_path", +) + + +class _Runtime: + def __init__(self) -> None: + self.instrumentation: SchedulerInstrumentation | None = None + self.report_path: Path | None = None + + +_runtime = _Runtime() + + +def set_instrumentation(instrumentation: SchedulerInstrumentation | None) -> None: + """Configure the global instrumentation singleton.""" + _runtime.instrumentation = instrumentation + + +def set_report_path(path: Path | None) -> None: + """Configure the instrumentation report path.""" + _runtime.report_path = path + + +def get() -> SchedulerInstrumentation | None: + """Get the configured global instrumentation.""" + return _runtime.instrumentation + + +def flush() -> None: + """Dump the instrumentation report as JSON to the configured report path.""" + if _runtime.instrumentation is None or not _runtime.report_path: + return + _runtime.instrumentation.dump_json_report(_runtime.report_path) diff --git a/src/dvsim/instrumentation/timing.py b/src/dvsim/instrumentation/timing.py new file mode 100644 index 00000000..187413c6 --- /dev/null +++ b/src/dvsim/instrumentation/timing.py @@ -0,0 +1,109 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim scheduler instrumentation for timing-related information.""" + +import time +from dataclasses import asdict, dataclass +from typing import Any + +from dvsim.instrumentation.base import ( + InstrumentationFragments, + JobFragment, + SchedulerFragment, + SchedulerInstrumentation, +) +from dvsim.job.data import JobSpec +from dvsim.job.status import JobStatus + +__all__ = ( + "TimingInstrumentation", + "TimingJobFragment", + "TimingSchedulerFragment", +) + + +@dataclass +class TimingSchedulerFragment(SchedulerFragment): + """Instrumented metrics about the scheduler reported by the `TimingInstrumentation`.""" + + start_time: float | None = None + end_time: float | None = None + + @property + def duration(self) -> float | None: + """The duration of the entire scheduler run.""" + if self.start_time is None or self.end_time is None: + return None + return self.end_time - self.start_time + + def to_dict(self) -> dict[str, Any]: + """Convert the scheduler metrics to a dictionary, including the `duration` property.""" + data = asdict(self) + duration = self.duration + if duration: + data["duration"] = duration + return data + + +@dataclass +class TimingJobFragment(JobFragment): + """Instrumented metrics about the scheduler reported by the `TimingInstrumentation`.""" + + start_time: float | None = None + end_time: float | None = None + + @property + def duration(self) -> float | None: + """The duration of the job.""" + if self.start_time is None or self.end_time is None: + return None + return self.end_time - self.start_time + + def to_dict(self) -> dict[str, Any]: + """Convert the job metrics to a dictionary, including the `duration` property.""" + data = asdict(self) + duration = self.duration + if duration: + data["duration"] = duration + return data + + +class TimingInstrumentation(SchedulerInstrumentation): + """Timing instrumentation for the scheduler. + + Collects information about the start time, end time and duration of the scheduler itself and + all of the jobs that it dispatches. + """ + + def __init__(self) -> None: + """Construct a `TimingInstrumentation`.""" + super().__init__() + self._scheduler = TimingSchedulerFragment() + self._jobs: dict[tuple[str, str], TimingJobFragment] = {} + + def on_scheduler_start(self) -> None: + """Notify instrumentation that the scheduler has begun.""" + self._scheduler.start_time = time.time() + + def on_scheduler_end(self) -> None: + """Notify instrumentation that the scheduler has finished.""" + self._scheduler.end_time = time.time() + + def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: + """Notify instrumentation of a change in status for some scheduled job.""" + job_id = (job.full_name, job.target) + job_info = self._jobs.get(job_id) + if job_info is None: + job_info = TimingJobFragment(job) + self._jobs[job_id] = job_info + + if job_info.start_time is None and status != JobStatus.QUEUED: + job_info.start_time = time.time() + if status.ended: + job_info.end_time = time.time() + + def build_report_fragments(self) -> InstrumentationFragments | None: + """Build report fragments from the collected instrumentation information.""" + return ([self._scheduler], list(self._jobs.values())) diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index 766778aa..2b66169b 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -20,6 +20,8 @@ from types import FrameType from typing import TYPE_CHECKING, Any +from dvsim import instrumentation +from dvsim.instrumentation import NoOpInstrumentation from dvsim.job.data import CompletedJobStatus, JobSpec from dvsim.job.status import JobStatus from dvsim.launcher.base import Launcher, LauncherBusyError, LauncherError @@ -92,6 +94,11 @@ def __init__( interactive: launch the tools in interactive mode. """ + # Start any instrumentation + inst = instrumentation.get() + self._instrumentation = NoOpInstrumentation() if inst is None else inst + self._instrumentation.start() + self._jobs: Mapping[str, JobSpec] = {i.full_name: i for i in items} # 'scheduled[target][cfg]' is a list of JobSpec object names for the chosen @@ -195,6 +202,8 @@ def run(self) -> Sequence[CompletedJobStatus]: Returns the results (status) of all items dispatched for all targets and cfgs. """ + # Notify instrumentation that the scheduler started + self._instrumentation.on_scheduler_start() timer = Timer() # On SIGTERM or SIGINT, tell the runner to quit. @@ -243,6 +252,11 @@ def on_signal(signal_received: int, _: FrameType | None) -> None: # Cleanup the status printer. self._status_printer.exit() + # Finish instrumentation and generate the instrumentation report + self._instrumentation.on_scheduler_end() + self._instrumentation.stop() + instrumentation.flush() + # We got to the end without anything exploding. Return the results. results = [] for name, status in self.job_status.items(): @@ -502,14 +516,17 @@ def _poll(self, hms: str) -> bool: if status == JobStatus.PASSED: self._passed[target].add(job_name) + self._instrumentation.on_job_status_change(self._jobs[job_name], status) elif status == JobStatus.FAILED: self._failed[target].add(job_name) + self._instrumentation.on_job_status_change(self._jobs[job_name], status) level = log.ERROR else: # Killed, still Queued, or some error when dispatching. self._killed[target].add(job_name) + self._instrumentation.on_job_status_change(self._jobs[job_name], status.KILLED) level = log.ERROR self._running[target].pop(self._last_item_polled_idx[target]) @@ -559,7 +576,7 @@ def _dispatch_job(self, hms: str, target: str, job_name: str) -> None: self._queued[target].append(job_name) log.verbose( - "[%s]: [%s]: [reqeued]: %s", + "[%s]: [%s]: [requeued]: %s", hms, target, job_name, @@ -568,6 +585,7 @@ def _dispatch_job(self, hms: str, target: str, job_name: str) -> None: self._running[target].append(job_name) self.job_status[job_name] = JobStatus.DISPATCHED + self._instrumentation.on_job_status_change(self._jobs[job_name], JobStatus.DISPATCHED) def _dispatch(self, hms: str) -> None: """Dispatch some queued items if possible. @@ -708,11 +726,12 @@ def _cancel_item(self, job_name: str, *, cancel_successors: bool = True) -> None cancel_successors: if set then cancel successors as well (True). """ - target = self._jobs[job_name].target + job = self._jobs[job_name] self.job_status[job_name] = JobStatus.KILLED - self._killed[target].add(job_name) - if job_name in self._queued[target]: - self._queued[target].remove(job_name) + self._killed[job.target].add(job_name) + self._instrumentation.on_job_status_change(job, JobStatus.KILLED) + if job_name in self._queued[job.target]: + self._queued[job.target].remove(job_name) else: self._unschedule_item(job_name) @@ -726,9 +745,10 @@ def _kill_item(self, job_name: str) -> None: job_name: name of the job to kill """ - target = self._jobs[job_name].target + job = self._jobs[job_name] self._launchers[job_name].kill() self.job_status[job_name] = JobStatus.KILLED - self._killed[target].add(job_name) - self._running[target].remove(job_name) + self._killed[job.target].add(job_name) + self._instrumentation.on_job_status_change(job, JobStatus.KILLED) + self._running[job.target].remove(job_name) self._cancel_successors(job_name) diff --git a/uv.lock b/uv.lock index b05cbd4f..131c3d34 100644 --- a/uv.lock +++ b/uv.lock @@ -322,6 +322,7 @@ dependencies = [ { name = "hjson" }, { name = "jinja2" }, { name = "logzero" }, + { name = "psutil" }, { name = "pydantic" }, { name = "pyyaml" }, { name = "tabulate" }, @@ -400,6 +401,7 @@ requires-dist = [ { name = "ipython", marker = "extra == 'debug'", specifier = ">=8.18.1" }, { name = "jinja2", specifier = ">=3.1.6" }, { name = "logzero", specifier = ">=1.7.0" }, + { name = "psutil", specifier = ">=7.2.2" }, { name = "pydantic", specifier = ">=2.9.2" }, { name = "pyhamcrest", marker = "extra == 'test'", specifier = ">=2.1.0" }, { name = "pyright", marker = "extra == 'typing'", specifier = ">=1.1.381" }, @@ -816,6 +818,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/84/03/0d3ce49e2505ae70cf43bc5bb3033955d2fc9f932163e84dc0779cc47f48/prompt_toolkit-3.0.52-py3-none-any.whl", hash = "sha256:9aac639a3bbd33284347de5ad8d68ecc044b91a762dc39b7c21095fcd6a19955", size = 391431, upload-time = "2025-08-27T15:23:59.498Z" }, ] +[[package]] +name = "psutil" +version = "7.2.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/aa/c6/d1ddf4abb55e93cebc4f2ed8b5d6dbad109ecb8d63748dd2b20ab5e57ebe/psutil-7.2.2.tar.gz", hash = "sha256:0746f5f8d406af344fd547f1c8daa5f5c33dbc293bb8d6a16d80b4bb88f59372", size = 493740, upload-time = "2026-01-28T18:14:54.428Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/51/08/510cbdb69c25a96f4ae523f733cdc963ae654904e8db864c07585ef99875/psutil-7.2.2-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:2edccc433cbfa046b980b0df0171cd25bcaeb3a68fe9022db0979e7aa74a826b", size = 130595, upload-time = "2026-01-28T18:14:57.293Z" }, + { url = "https://files.pythonhosted.org/packages/d6/f5/97baea3fe7a5a9af7436301f85490905379b1c6f2dd51fe3ecf24b4c5fbf/psutil-7.2.2-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:e78c8603dcd9a04c7364f1a3e670cea95d51ee865e4efb3556a3a63adef958ea", size = 131082, upload-time = "2026-01-28T18:14:59.732Z" }, + { url = "https://files.pythonhosted.org/packages/37/d6/246513fbf9fa174af531f28412297dd05241d97a75911ac8febefa1a53c6/psutil-7.2.2-cp313-cp313t-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1a571f2330c966c62aeda00dd24620425d4b0cc86881c89861fbc04549e5dc63", size = 181476, upload-time = "2026-01-28T18:15:01.884Z" }, + { url = "https://files.pythonhosted.org/packages/b8/b5/9182c9af3836cca61696dabe4fd1304e17bc56cb62f17439e1154f225dd3/psutil-7.2.2-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:917e891983ca3c1887b4ef36447b1e0873e70c933afc831c6b6da078ba474312", size = 184062, upload-time = "2026-01-28T18:15:04.436Z" }, + { url = "https://files.pythonhosted.org/packages/16/ba/0756dca669f5a9300d0cbcbfae9a4c30e446dfc7440ffe43ded5724bfd93/psutil-7.2.2-cp313-cp313t-win_amd64.whl", hash = "sha256:ab486563df44c17f5173621c7b198955bd6b613fb87c71c161f827d3fb149a9b", size = 139893, upload-time = "2026-01-28T18:15:06.378Z" }, + { url = "https://files.pythonhosted.org/packages/1c/61/8fa0e26f33623b49949346de05ec1ddaad02ed8ba64af45f40a147dbfa97/psutil-7.2.2-cp313-cp313t-win_arm64.whl", hash = "sha256:ae0aefdd8796a7737eccea863f80f81e468a1e4cf14d926bd9b6f5f2d5f90ca9", size = 135589, upload-time = "2026-01-28T18:15:08.03Z" }, + { url = "https://files.pythonhosted.org/packages/81/69/ef179ab5ca24f32acc1dac0c247fd6a13b501fd5534dbae0e05a1c48b66d/psutil-7.2.2-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:eed63d3b4d62449571547b60578c5b2c4bcccc5387148db46e0c2313dad0ee00", size = 130664, upload-time = "2026-01-28T18:15:09.469Z" }, + { url = "https://files.pythonhosted.org/packages/7b/64/665248b557a236d3fa9efc378d60d95ef56dd0a490c2cd37dafc7660d4a9/psutil-7.2.2-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:7b6d09433a10592ce39b13d7be5a54fbac1d1228ed29abc880fb23df7cb694c9", size = 131087, upload-time = "2026-01-28T18:15:11.724Z" }, + { url = "https://files.pythonhosted.org/packages/d5/2e/e6782744700d6759ebce3043dcfa661fb61e2fb752b91cdeae9af12c2178/psutil-7.2.2-cp314-cp314t-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1fa4ecf83bcdf6e6c8f4449aff98eefb5d0604bf88cb883d7da3d8d2d909546a", size = 182383, upload-time = "2026-01-28T18:15:13.445Z" }, + { url = "https://files.pythonhosted.org/packages/57/49/0a41cefd10cb7505cdc04dab3eacf24c0c2cb158a998b8c7b1d27ee2c1f5/psutil-7.2.2-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e452c464a02e7dc7822a05d25db4cde564444a67e58539a00f929c51eddda0cf", size = 185210, upload-time = "2026-01-28T18:15:16.002Z" }, + { url = "https://files.pythonhosted.org/packages/dd/2c/ff9bfb544f283ba5f83ba725a3c5fec6d6b10b8f27ac1dc641c473dc390d/psutil-7.2.2-cp314-cp314t-win_amd64.whl", hash = "sha256:c7663d4e37f13e884d13994247449e9f8f574bc4655d509c3b95e9ec9e2b9dc1", size = 141228, upload-time = "2026-01-28T18:15:18.385Z" }, + { url = "https://files.pythonhosted.org/packages/f2/fc/f8d9c31db14fcec13748d373e668bc3bed94d9077dbc17fb0eebc073233c/psutil-7.2.2-cp314-cp314t-win_arm64.whl", hash = "sha256:11fe5a4f613759764e79c65cf11ebdf26e33d6dd34336f8a337aa2996d71c841", size = 136284, upload-time = "2026-01-28T18:15:19.912Z" }, + { url = "https://files.pythonhosted.org/packages/e7/36/5ee6e05c9bd427237b11b3937ad82bb8ad2752d72c6969314590dd0c2f6e/psutil-7.2.2-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:ed0cace939114f62738d808fdcecd4c869222507e266e574799e9c0faa17d486", size = 129090, upload-time = "2026-01-28T18:15:22.168Z" }, + { url = "https://files.pythonhosted.org/packages/80/c4/f5af4c1ca8c1eeb2e92ccca14ce8effdeec651d5ab6053c589b074eda6e1/psutil-7.2.2-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:1a7b04c10f32cc88ab39cbf606e117fd74721c831c98a27dc04578deb0c16979", size = 129859, upload-time = "2026-01-28T18:15:23.795Z" }, + { url = "https://files.pythonhosted.org/packages/b5/70/5d8df3b09e25bce090399cf48e452d25c935ab72dad19406c77f4e828045/psutil-7.2.2-cp36-abi3-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:076a2d2f923fd4821644f5ba89f059523da90dc9014e85f8e45a5774ca5bc6f9", size = 155560, upload-time = "2026-01-28T18:15:25.976Z" }, + { url = "https://files.pythonhosted.org/packages/63/65/37648c0c158dc222aba51c089eb3bdfa238e621674dc42d48706e639204f/psutil-7.2.2-cp36-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b0726cecd84f9474419d67252add4ac0cd9811b04d61123054b9fb6f57df6e9e", size = 156997, upload-time = "2026-01-28T18:15:27.794Z" }, + { url = "https://files.pythonhosted.org/packages/8e/13/125093eadae863ce03c6ffdbae9929430d116a246ef69866dad94da3bfbc/psutil-7.2.2-cp36-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:fd04ef36b4a6d599bbdb225dd1d3f51e00105f6d48a28f006da7f9822f2606d8", size = 148972, upload-time = "2026-01-28T18:15:29.342Z" }, + { url = "https://files.pythonhosted.org/packages/04/78/0acd37ca84ce3ddffaa92ef0f571e073faa6d8ff1f0559ab1272188ea2be/psutil-7.2.2-cp36-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:b58fabe35e80b264a4e3bb23e6b96f9e45a3df7fb7eed419ac0e5947c61e47cc", size = 148266, upload-time = "2026-01-28T18:15:31.597Z" }, + { url = "https://files.pythonhosted.org/packages/b4/90/e2159492b5426be0c1fef7acba807a03511f97c5f86b3caeda6ad92351a7/psutil-7.2.2-cp37-abi3-win_amd64.whl", hash = "sha256:eb7e81434c8d223ec4a219b5fc1c47d0417b12be7ea866e24fb5ad6e84b3d988", size = 137737, upload-time = "2026-01-28T18:15:33.849Z" }, + { url = "https://files.pythonhosted.org/packages/8c/c7/7bb2e321574b10df20cbde462a94e2b71d05f9bbda251ef27d104668306a/psutil-7.2.2-cp37-abi3-win_arm64.whl", hash = "sha256:8c233660f575a5a89e6d4cb65d9f938126312bca76d8fe087b947b3a1aaac9ee", size = 134617, upload-time = "2026-01-28T18:15:36.514Z" }, +] + [[package]] name = "ptyprocess" version = "0.7.0"