Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/dvsim/instrumentation/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class MetadataJobFragment(JobFragment):
full_name: str
job_type: str
target: str
tool: str
dependencies: list[str]
status: str

Expand Down Expand Up @@ -59,6 +60,7 @@ def build_report_fragments(self) -> InstrumentationFragments | None:
spec.full_name,
spec.job_type,
spec.target,
spec.tool.name,
spec.dependencies,
status_str,
)
Expand Down
23 changes: 4 additions & 19 deletions src/dvsim/instrumentation/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class ResourceJobFragment(JobFragment):
max_rss_bytes: int | None = None
avg_rss_bytes: float | None = None
avg_cpu_percent: float | None = None
avg_cpu_per_core: list[float] | None = None

num_resource_samples: int = 0

Expand All @@ -69,50 +68,36 @@ class JobResourceAggregate:
Tracks aggregate information over a number of samples whilst minimizing memory usage.
"""

def __init__(self, job: JobSpec, num_cores: int | None) -> None:
def __init__(self, job: JobSpec) -> None:
"""Construct an aggregate for storing sampling info for a given job specification.

Arguments:
job: The specification of the job which is having its information aggregated.
num_cores: The number of logical CPU cores available on the host system.

"""
self.job_spec = job
self.num_cores = num_cores
self.sample_count = 0
self.sum_rss = 0.0
self.max_rss = 0
self.sum_cpu = 0.0
if num_cores:
self.sum_cpu_per_core = [0.0] * num_cores

def add_sample(self, rss: int, cpu: float, cpu_per_core: list[float]) -> None:
def add_sample(self, rss: int, cpu: float) -> None:
"""Aggregate an additional resource sample taken during this job's active window."""
self.sample_count += 1
self.sum_rss += rss
self.max_rss = max(self.max_rss, rss)
self.sum_cpu += cpu
if self.num_cores:
self.sum_cpu_per_core = [
total + n for total, n in zip(self.sum_cpu_per_core, cpu_per_core, strict=True)
]

def finalize(self) -> ResourceJobFragment:
"""Finalize the aggregated information for a job, generating a report fragment."""
if self.sample_count == 0:
return ResourceJobFragment(self.job_spec)

if self.num_cores:
avg_cpu_per_core = [x / self.sample_count for x in self.sum_cpu_per_core]
else:
avg_cpu_per_core = None

return ResourceJobFragment(
self.job_spec,
max_rss_bytes=self.max_rss,
avg_rss_bytes=self.sum_rss / self.sample_count,
avg_cpu_percent=self.sum_cpu / self.sample_count,
avg_cpu_per_core=avg_cpu_per_core,
num_resource_samples=self.sample_count,
)

Expand Down Expand Up @@ -218,7 +203,7 @@ def _sampling_loop(self) -> None:
# Update all running job aggregates with system sample
with self._lock:
for aggregate in self._running_jobs.values():
aggregate.add_sample(sys_rss, sys_cpu, sys_cpu_per_core)
aggregate.add_sample(sys_rss, sys_cpu)

sleep_time = max(next_run_at - time.time(), 0)
time.sleep(sleep_time)
Expand All @@ -239,7 +224,7 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None:
running = job_id in self._running_jobs
started = running or job_id in self._finished_jobs
if not started and status != JobStatus.QUEUED:
self._running_jobs[job_id] = JobResourceAggregate(job, self._num_cores)
self._running_jobs[job_id] = JobResourceAggregate(job)
running = True
if running and status.ended:
aggregates = self._running_jobs.pop(job_id)
Expand Down
Loading