Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies = [
"hjson>=3.1.0",
"jinja2>=3.1.6",
"logzero>=1.7.0",
"psutil>=7.2.2",
"pydantic>=2.9.2",
"pyyaml>=6.0.2",
"tabulate>=0.9.0",
Expand Down
6 changes: 6 additions & 0 deletions ruff-ci.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ ignore = [
"B008", # function calls in default arg value
"PERF203", # Exception handling in loops

# Some noqas might be needed for compliance with full Ruff configuration,
# but not with the temporary ignores listed below in the CI configuration.
# To stop CI telling us we need to remove these noqas, temporarily disable
# the rule enforcing this in CI.
"RUF100",

# Temporary ignores just for CI until the rules pass then they can be removed
# from here to prevent future regressions
"ANN001",
Expand Down
13 changes: 13 additions & 0 deletions src/dvsim/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from pathlib import Path

from dvsim.flow.factory import make_cfg
from dvsim.instrumentation import InstrumentationFactory, set_instrumentation
from dvsim.job.deploy import RunTest
from dvsim.launcher.base import Launcher
from dvsim.launcher.factory import set_launcher_type
Expand Down Expand Up @@ -755,6 +756,15 @@ def parse_args():

dvg = parser.add_argument_group("Controlling DVSim itself")

dvg.add_argument(
"--instrument",
dest="instrumentation",
nargs="+",
default=[],
choices=InstrumentationFactory.options(),
help="Enable scheduler instrumentation (can specify multiple types).",
)

dvg.add_argument(
"--print-interval",
"-pi",
Expand Down Expand Up @@ -868,6 +878,9 @@ def main() -> None:
FakeLauncher.max_parallel = args.max_parallel
set_launcher_type(is_local=args.local, fake=args.fake)

# Configure scheduler instrumentation
set_instrumentation(InstrumentationFactory.create(args.instrumentation))

# Build infrastructure from hjson file and create the list of items to
# be deployed.
cfg = make_cfg(args.cfg, args, proj_root)
Expand Down
7 changes: 6 additions & 1 deletion src/dvsim/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import hjson

from dvsim import instrumentation
from dvsim.flow.hjson import set_target_attribute
from dvsim.job.data import CompletedJobStatus
from dvsim.launcher.factory import get_launcher_cls
Expand Down Expand Up @@ -152,9 +153,13 @@ def __init__(self, flow_cfg_file, hjson_data, args, mk_config) -> None:
self._expand()

# Construct the path variables after variable expansion.
self.results_dir = Path(self.scratch_base_path) / "reports" / self.rel_path
reports_dir = Path(self.scratch_base_path) / "reports"
self.results_dir = reports_dir / self.rel_path
self.results_page = self.results_dir / self.results_html_name

# Configure the report path for instrumentation
instrumentation.set_report_path(reports_dir / "metrics.json")

# Run any final checks
self._post_init()

Expand Down
53 changes: 53 additions & 0 deletions src/dvsim/instrumentation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright lowRISC contributors (OpenTitan project).
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0

"""DVSim Scheduler Instrumentation."""

from dvsim.instrumentation.base import (
CompositeInstrumentation,
InstrumentationFragment,
InstrumentationFragments,
JobFragment,
NoOpInstrumentation,
SchedulerFragment,
SchedulerInstrumentation,
merge_instrumentation_report,
)
from dvsim.instrumentation.factory import InstrumentationFactory
from dvsim.instrumentation.metadata import MetadataInstrumentation, MetadataJobFragment
from dvsim.instrumentation.resources import (
ResourceInstrumentation,
ResourceJobFragment,
ResourceSchedulerFragment,
)
from dvsim.instrumentation.runtime import flush, get, set_instrumentation, set_report_path
from dvsim.instrumentation.timing import (
TimingInstrumentation,
TimingJobFragment,
TimingSchedulerFragment,
)

__all__ = (
"CompositeInstrumentation",
"InstrumentationFactory",
"InstrumentationFragment",
"InstrumentationFragments",
"JobFragment",
"MetadataInstrumentation",
"MetadataJobFragment",
"NoOpInstrumentation",
"ResourceInstrumentation",
"ResourceJobFragment",
"ResourceSchedulerFragment",
"SchedulerFragment",
"SchedulerInstrumentation",
"TimingInstrumentation",
"TimingJobFragment",
"TimingSchedulerFragment",
"flush",
"get",
"merge_instrumentation_report",
"set_instrumentation",
"set_report_path",
)
224 changes: 224 additions & 0 deletions src/dvsim/instrumentation/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
# Copyright lowRISC contributors (OpenTitan project).
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0

"""DVSim scheduler instrumentation base classes."""

import json
from collections.abc import Collection, Iterable, Sequence
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, TypeAlias

from dvsim.job.data import JobSpec
from dvsim.job.status import JobStatus
from dvsim.logging import log

__all__ = (
"CompositeInstrumentation",
"InstrumentationFragment",
"InstrumentationFragments",
"JobFragment",
"NoOpInstrumentation",
"SchedulerFragment",
"SchedulerInstrumentation",
"merge_instrumentation_report",
)


@dataclass
class InstrumentationFragment:
"""Base class for instrumentation reports / report fragments."""

def to_dict(self) -> dict[str, Any]:
"""Convert the report fragment to a dictionary."""
return asdict(self)


@dataclass
class SchedulerFragment(InstrumentationFragment):
"""Base class for instrumentation report fragments related to the scheduler."""


@dataclass
class JobFragment(InstrumentationFragment):
"""Base class for instrumentation report fragments related to individual jobs."""

job: JobSpec


def merge_instrumentation_report(
scheduler_fragments: Collection[SchedulerFragment], job_fragments: Collection[JobFragment]
) -> dict[str, Any]:
"""Merge multiple instrumentation report fragments into a combined dictionary.

When using multiple instrumentation mechanisms, this combines relevant per-job and global
scheduler information into one common interface, to make the output more readable.
"""
log.info("Merging instrumentation report data...")

# Merge information related to the scheduler
scheduler: dict[str, Any] = {}
for i, scheduler_frag in enumerate(scheduler_fragments, start=1):
log.debug(
"Merging instrumentation report scheduler data (%d/%d)", i, len(scheduler_fragments)
)
scheduler.update(scheduler_frag.to_dict())

# Merge information related to specific jobs
jobs: dict[tuple[str, str], dict[str, Any]] = {}
for i, job_frag in enumerate(job_fragments, start=1):
log.debug("Merging instrumentation report job data (%d/%d)", i, len(job_fragments))
spec = job_frag.job
# We can uniquely identify jobs from the combination of their full name & target
job_id = (spec.full_name, spec.target)
job = jobs.get(job_id)
if job is None:
job = {}
jobs[job_id] = job
job.update({k: v for k, v in job_frag.to_dict().items() if k != "job"})

log.info("Finished merging instrumentation report data.")
return {"scheduler": scheduler, "jobs": list(jobs.values())}


# Each instrumentation object can report any number of information fragments about the
# scheduler and about its jobs.
InstrumentationFragments: TypeAlias = tuple[Sequence[SchedulerFragment], Sequence[JobFragment]]


class SchedulerInstrumentation:
"""Instrumentation for the scheduler.

Base class for scheduler instrumentation, recording a variety of performance and
behavioural metrics for analysis.
"""

@property
def name(self) -> str:
"""The name to use to refer to this instrumentation mechanism."""
return self.__class__.__name__

def start(self) -> None:
"""Begin instrumentation, starting whatever is needed before the scheduler is run."""
log.info("Starting instrumentation: %s", self.name)
self._start()

def _start(self) -> None:
return None

def stop(self) -> None:
"""Stop instrumentation, ending any instrumentation-specific resources."""
log.info("Stopping instrumentation: %s", self.name)
self._stop()

def _stop(self) -> None:
return None

def on_scheduler_start(self) -> None:
"""Notify instrumentation that the scheduler has begun."""
return

def on_scheduler_end(self) -> None:
"""Notify instrumentation that the scheduler has finished."""
return

def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: # noqa: ARG002
"""Notify instrumentation of a change in status for some scheduled job."""
return

def build_report_fragments(self) -> InstrumentationFragments | None:
"""Build report fragments from the collected instrumentation information."""
return None

def build_report(self) -> dict[str, Any] | None:
"""Build an instrumentation report dict containing collected instrumentation info."""
log.info("Building instrumentation report...")
fragments = self.build_report_fragments()
return None if fragments is None else merge_instrumentation_report(*fragments)

def dump_json_report(self, report_path: Path) -> None:
"""Dump a given JSON instrumentation report to a specified file path."""
report = self.build_report()
if not report:
return
log.info("Dumping JSON instrumentation report...")
if report_path.is_dir():
raise ValueError("Metric report path cannot be a directory.")
try:
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(report, indent=2))
log.info("JSON instrumentation report dumped to: %s", str(report_path))
except (OSError, FileNotFoundError) as e:
log.error("Error writing instrumented metrics to %s: %s", str(report_path), str(e))


class NoOpInstrumentation(SchedulerInstrumentation):
"""Scheduler instrumentation which just does nothing."""

def start(self) -> None:
"""Begin instrumentation, doing nothing (not even logging)."""

def stop(self) -> None:
"""End instrumentation, doing nothing (not even logging)."""

def build_report(self) -> dict[str, Any] | None:
"""Build an instrumentation report, doing nothing (not even logging)."""
return None


class CompositeInstrumentation(SchedulerInstrumentation):
"""Composite instrumentation for combining several instrumentations to be used at once."""

def __init__(self, instrumentations: Iterable[SchedulerInstrumentation]) -> None:
"""Construct an instrumentation object composed of many instrumentations.

Arguments:
instrumentations: The list of instrumentations to compose.

"""
super().__init__()
self._instrumentations = instrumentations

@property
def name(self) -> str:
"""The name to use to refer to this composed instrumentation."""
composed = ", ".join(inst.name for inst in self._instrumentations)
return f"CompositeInstrumentation({composed})"

def _start(self) -> None:
for inst in self._instrumentations:
inst.start()

def _stop(self) -> None:
for inst in self._instrumentations:
inst.stop()

def on_scheduler_start(self) -> None:
"""Notify instrumentation that the scheduler has begun."""
for inst in self._instrumentations:
inst.on_scheduler_start()

def on_scheduler_end(self) -> None:
"""Notify instrumentation that the scheduler has finished."""
for inst in self._instrumentations:
inst.on_scheduler_end()

def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None:
"""Notify instrumentation of a change in status for some scheduled job."""
for inst in self._instrumentations:
inst.on_job_status_change(job, status)

def build_report_fragments(self) -> InstrumentationFragments | None:
"""Build report fragments from the collected instrumentation information."""
scheduler_fragments = []
job_fragments = []

for inst in self._instrumentations:
fragments = inst.build_report_fragments()
if fragments is None:
continue
scheduler_fragments += fragments[0]
job_fragments += fragments[1]

return (scheduler_fragments, job_fragments)
Loading
Loading