diff --git a/src/dvsim/instrumentation/metadata.py b/src/dvsim/instrumentation/metadata.py index d8dd6d5..7871db1 100644 --- a/src/dvsim/instrumentation/metadata.py +++ b/src/dvsim/instrumentation/metadata.py @@ -28,6 +28,7 @@ class MetadataJobFragment(JobFragment): full_name: str job_type: str target: str + tool: str dependencies: list[str] status: str @@ -59,6 +60,7 @@ def build_report_fragments(self) -> InstrumentationFragments | None: spec.full_name, spec.job_type, spec.target, + spec.tool.name, spec.dependencies, status_str, ) diff --git a/src/dvsim/instrumentation/resources.py b/src/dvsim/instrumentation/resources.py index f010f01..7f54e49 100644 --- a/src/dvsim/instrumentation/resources.py +++ b/src/dvsim/instrumentation/resources.py @@ -58,7 +58,6 @@ class ResourceJobFragment(JobFragment): max_rss_bytes: int | None = None avg_rss_bytes: float | None = None avg_cpu_percent: float | None = None - avg_cpu_per_core: list[float] | None = None num_resource_samples: int = 0 @@ -69,50 +68,36 @@ class JobResourceAggregate: Tracks aggregate information over a number of samples whilst minimizing memory usage. """ - def __init__(self, job: JobSpec, num_cores: int | None) -> None: + def __init__(self, job: JobSpec) -> None: """Construct an aggregate for storing sampling info for a given job specification. Arguments: job: The specification of the job which is having its information aggregated. - num_cores: The number of logical CPU cores available on the host system. """ self.job_spec = job - self.num_cores = num_cores self.sample_count = 0 self.sum_rss = 0.0 self.max_rss = 0 self.sum_cpu = 0.0 - if num_cores: - self.sum_cpu_per_core = [0.0] * num_cores - def add_sample(self, rss: int, cpu: float, cpu_per_core: list[float]) -> None: + def add_sample(self, rss: int, cpu: float) -> None: """Aggregate an additional resource sample taken during this job's active window.""" self.sample_count += 1 self.sum_rss += rss self.max_rss = max(self.max_rss, rss) self.sum_cpu += cpu - if self.num_cores: - self.sum_cpu_per_core = [ - total + n for total, n in zip(self.sum_cpu_per_core, cpu_per_core, strict=True) - ] def finalize(self) -> ResourceJobFragment: """Finalize the aggregated information for a job, generating a report fragment.""" if self.sample_count == 0: return ResourceJobFragment(self.job_spec) - if self.num_cores: - avg_cpu_per_core = [x / self.sample_count for x in self.sum_cpu_per_core] - else: - avg_cpu_per_core = None - return ResourceJobFragment( self.job_spec, max_rss_bytes=self.max_rss, avg_rss_bytes=self.sum_rss / self.sample_count, avg_cpu_percent=self.sum_cpu / self.sample_count, - avg_cpu_per_core=avg_cpu_per_core, num_resource_samples=self.sample_count, ) @@ -218,7 +203,7 @@ def _sampling_loop(self) -> None: # Update all running job aggregates with system sample with self._lock: for aggregate in self._running_jobs.values(): - aggregate.add_sample(sys_rss, sys_cpu, sys_cpu_per_core) + aggregate.add_sample(sys_rss, sys_cpu) sleep_time = max(next_run_at - time.time(), 0) time.sleep(sleep_time) @@ -239,7 +224,7 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: running = job_id in self._running_jobs started = running or job_id in self._finished_jobs if not started and status != JobStatus.QUEUED: - self._running_jobs[job_id] = JobResourceAggregate(job, self._num_cores) + self._running_jobs[job_id] = JobResourceAggregate(job) running = True if running and status.ended: aggregates = self._running_jobs.pop(job_id)