Skip to content

Commit 853ae4b

Browse files
TD-P005: heartbeat process_metrics report lifetime aggregates as instantaneous (#45)
1 parent b96380c commit 853ae4b

3 files changed

Lines changed: 133 additions & 14 deletions

File tree

CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
66

77
## [Unreleased]
88

9+
### Fixed
10+
- Worker heartbeat `process_metrics` now report instantaneous values
11+
instead of process-lifetime aggregates. `cpu_percent` is the share of
12+
wall time the worker spent on CPU during the interval since the
13+
previous heartbeat (previously the lifetime average, which converged
14+
to a fixed value within minutes and stopped tracking live load), and
15+
`memory_bytes` is the current resident set size sampled from
16+
`/proc/self/statm` on Linux (previously `ru_maxrss`, which is the
17+
process-lifetime high-water mark and never decreased after a startup
18+
spike). Platforms without `/proc` no longer report `memory_bytes`
19+
rather than reporting a misleading peak. The heartbeat protocol shape
20+
is unchanged; the server records whatever the worker sends, so the
21+
Worker Status surface starts showing accurate live numbers as soon as
22+
workers upgrade.
23+
924
### Changed
1025
- `tests/test_client.py` now closes the `schedule.history` polyglot
1126
parity slice. `test_get_schedule_history_matches_polyglot_fixture`

src/durable_workflow/worker.py

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,13 @@ def __init__(
334334
self._activity_inflight = 0
335335
self._heartbeat_interval = float(heartbeat_interval)
336336
self._process_started_at = time.time()
337+
# CPU sampling baseline. The heartbeat reports an *instantaneous*
338+
# cpu_percent — CPU time burned in the interval since the previous
339+
# heartbeat, divided by that interval — rather than the lifetime
340+
# average. ``_last_cpu_sample_at`` is None until the first sample
341+
# has been taken; subsequent samples diff against it.
342+
self._last_cpu_sample_at: float | None = None
343+
self._last_cpu_total_seconds: float = 0.0
337344
configured_metrics = metrics if metrics is not None else getattr(client, "metrics", NOOP_METRICS)
338345
self.metrics: MetricsRecorder = configured_metrics or NOOP_METRICS
339346
self.interceptors = tuple(interceptors)
@@ -1152,32 +1159,54 @@ def _current_process_metrics(self) -> dict[str, Any]:
11521159
import os
11531160
import socket
11541161

1162+
now = time.time()
11551163
metrics: dict[str, Any] = {
1156-
"process_uptime_seconds": int(time.time() - self._process_started_at),
1164+
"process_uptime_seconds": int(now - self._process_started_at),
11571165
"process_id": os.getpid(),
11581166
}
11591167

1168+
# ``memory_bytes`` is the *current* resident set size, not the
1169+
# lifetime peak. ``resource.getrusage().ru_maxrss`` is the high-
1170+
# water mark since the process started, which masks freed memory
1171+
# and never decreases — wrong shape for a heartbeat metric meant
1172+
# to show what the worker is using right now. On Linux we read
1173+
# the second field of ``/proc/self/statm`` (resident pages) and
1174+
# multiply by the page size. Platforms without ``/proc`` get no
1175+
# ``memory_bytes`` field rather than a misleading lifetime peak.
1176+
if sys.platform.startswith("linux"):
1177+
try:
1178+
with open("/proc/self/statm") as statm:
1179+
fields = statm.read().split()
1180+
if len(fields) >= 2:
1181+
metrics["memory_bytes"] = int(fields[1]) * os.sysconf("SC_PAGE_SIZE")
1182+
except (OSError, ValueError):
1183+
pass
1184+
1185+
# ``cpu_percent`` is the share of wall time the process spent on
1186+
# CPU during the interval since the previous heartbeat — not the
1187+
# lifetime average, which converges to a fixed value and stops
1188+
# tracking live load. The first sample bootstraps from process
1189+
# start so the very first heartbeat still has a number.
11601190
try:
11611191
import resource
11621192

11631193
usage = resource.getrusage(resource.RUSAGE_SELF)
1164-
# ru_maxrss is kilobytes on Linux and bytes on macOS — normalize
1165-
# to bytes. The server stores whatever is sent so the units stay
1166-
# consistent across SDKs.
1167-
if sys.platform == "darwin":
1168-
metrics["memory_bytes"] = int(usage.ru_maxrss)
1169-
else:
1170-
metrics["memory_bytes"] = int(usage.ru_maxrss) * 1024
1171-
11721194
cpu_seconds = float(usage.ru_utime) + float(usage.ru_stime)
1173-
wall_seconds = max(0.001, time.time() - self._process_started_at)
1195+
if self._last_cpu_sample_at is None:
1196+
interval = max(0.001, now - self._process_started_at)
1197+
delta_cpu = max(0.0, cpu_seconds)
1198+
else:
1199+
interval = max(0.001, now - self._last_cpu_sample_at)
1200+
delta_cpu = max(0.0, cpu_seconds - self._last_cpu_total_seconds)
11741201
metrics["cpu_percent"] = max(
1175-
0.0, min(100.0, round((cpu_seconds / wall_seconds) * 100.0, 2))
1202+
0.0, min(100.0, round((delta_cpu / interval) * 100.0, 2))
11761203
)
1204+
self._last_cpu_sample_at = now
1205+
self._last_cpu_total_seconds = cpu_seconds
11771206
except (ImportError, OSError):
1178-
# `resource` is POSIX-only — Windows skips getrusage but still
1179-
# reports pid + uptime + host so the operator surface remains
1180-
# populated.
1207+
# ``resource`` is POSIX-only — Windows skips the CPU sample
1208+
# but still reports pid + uptime + host so the operator
1209+
# surface stays populated.
11811210
pass
11821211

11831212
try:

tests/test_worker.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import contextlib
55
import logging
6+
import sys
67
from unittest.mock import AsyncMock
78

89
import pytest
@@ -1436,6 +1437,80 @@ async def test_heartbeat_loop_survives_transient_errors(
14361437
await run_task
14371438
assert mock_client.heartbeat_worker.call_count >= 2
14381439

1440+
def test_process_metrics_cpu_percent_is_instantaneous_not_lifetime(
1441+
self, mock_client: AsyncMock, monkeypatch: pytest.MonkeyPatch
1442+
) -> None:
1443+
"""``cpu_percent`` reflects only the interval since the previous
1444+
heartbeat. A worker that was busy at startup and idle ever since
1445+
used to keep reporting the lifetime average forever, hiding the
1446+
fact that it is no longer doing CPU work."""
1447+
1448+
import resource
1449+
1450+
from durable_workflow import worker as worker_mod
1451+
1452+
worker = Worker(
1453+
mock_client,
1454+
task_queue="q1",
1455+
workflows=[TestWorkflow],
1456+
activities=[echo_activity],
1457+
)
1458+
worker._process_started_at = 1000.0
1459+
1460+
class FakeUsage:
1461+
def __init__(self, utime: float, stime: float) -> None:
1462+
self.ru_utime = utime
1463+
self.ru_stime = stime
1464+
self.ru_maxrss = 0
1465+
1466+
fake_now = {"value": 1010.0}
1467+
fake_usage = {"value": FakeUsage(7.0, 1.0)}
1468+
1469+
monkeypatch.setattr(worker_mod.time, "time", lambda: fake_now["value"])
1470+
monkeypatch.setattr(resource, "getrusage", lambda _who: fake_usage["value"])
1471+
1472+
# First sample: 8s of CPU over 10s of wall time since process start = 80%.
1473+
first = worker._current_process_metrics()
1474+
assert first["cpu_percent"] == 80.0
1475+
1476+
# Ten more wall seconds with only 0.5s of additional CPU (the
1477+
# worker went idle). Lifetime average would still be 8.5/20 =
1478+
# 42.5%, but the instantaneous reading is 5%.
1479+
fake_now["value"] = 1020.0
1480+
fake_usage["value"] = FakeUsage(7.3, 1.2)
1481+
second = worker._current_process_metrics()
1482+
assert second["cpu_percent"] == 5.0
1483+
1484+
# Fully idle for ten more seconds. Used to be ~32% (lifetime
1485+
# average), should now be 0.
1486+
fake_now["value"] = 1030.0
1487+
third = worker._current_process_metrics()
1488+
assert third["cpu_percent"] == 0.0
1489+
assert third["process_uptime_seconds"] == 30
1490+
1491+
def test_process_metrics_memory_bytes_is_current_resident_set(
1492+
self, mock_client: AsyncMock
1493+
) -> None:
1494+
"""``memory_bytes`` is the current resident set size on Linux —
1495+
read from ``/proc/self/statm`` — not ``ru_maxrss``, which is the
1496+
process-lifetime high-water mark and never decreases after a
1497+
startup spike."""
1498+
1499+
if not sys.platform.startswith("linux"):
1500+
pytest.skip("memory_bytes is only sampled on Linux")
1501+
1502+
worker = Worker(
1503+
mock_client,
1504+
task_queue="q1",
1505+
workflows=[TestWorkflow],
1506+
activities=[echo_activity],
1507+
)
1508+
1509+
metrics = worker._current_process_metrics()
1510+
assert "memory_bytes" in metrics
1511+
assert isinstance(metrics["memory_bytes"], int)
1512+
assert metrics["memory_bytes"] > 0
1513+
14391514

14401515
class TestRunUntil:
14411516
@pytest.mark.asyncio

0 commit comments

Comments
 (0)