Skip to content

Commit 3bf7d1f

Browse files
authored
feat(streaming): emit OTel metrics for ttft, tps, token counts (#347)
1 parent 5a53858 commit 3bf7d1f

8 files changed

Lines changed: 547 additions & 3 deletions

File tree

src/agentex/lib/core/observability/__init__.py

Whitespace-only changes.
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
"""OTel metrics for LLM calls.
2+
3+
Single source of truth for LLM-call instrumentation across all agentex code
4+
paths — temporal+openai_agents streaming today, sync ACP and the Claude SDK
5+
plugin in future PRs. Centralizing the instrument definitions here means
6+
those follow-ups don't need to redefine the metric names, units, or
7+
description strings; they import ``get_llm_metrics()`` and record values.
8+
9+
The meter is no-op when the application hasn't configured a ``MeterProvider``,
10+
so importing this module is safe for runtimes that don't use OTel. Instruments
11+
are created lazily on first ``get_llm_metrics()`` call so a ``MeterProvider``
12+
configured *after* this module is imported still binds correctly.
13+
14+
Cardinality is bounded:
15+
- All metrics carry only ``model`` (the LLM model name).
16+
- ``requests`` additionally carries ``status``, drawn from a small fixed set
17+
(see ``classify_status``).
18+
19+
Resource attributes (``service.name``, ``k8s.*``, etc.) come from the
20+
application's OTel resource configuration and are added to every series
21+
automatically.
22+
"""
23+
24+
from __future__ import annotations
25+
26+
from typing import Optional
27+
28+
from opentelemetry import metrics
29+
30+
31+
class LLMMetrics:
32+
"""Lazily-created OTel instruments for LLM call telemetry."""
33+
34+
def __init__(self) -> None:
35+
meter = metrics.get_meter("agentex.llm")
36+
self.requests = meter.create_counter(
37+
name="agentex.llm.requests",
38+
unit="1",
39+
description=(
40+
"LLM call count tagged with status (success / rate_limit / "
41+
"server_error / client_error / timeout / network_error / "
42+
"other_error). Use to alert on 429s, 5xxs, etc."
43+
),
44+
)
45+
self.ttft_ms = meter.create_histogram(
46+
name="agentex.llm.ttft",
47+
unit="ms",
48+
description="Time from request submission to first content token (ms)",
49+
)
50+
# ttat (time-to-first-answering-token) is distinct from ttft for reasoning
51+
# models: ttft fires on the first reasoning chunk (which arrives quickly),
52+
# while ttat fires on the first user-visible answer token (text or tool
53+
# call). For non-reasoning models the two are equal.
54+
self.ttat_ms = meter.create_histogram(
55+
name="agentex.llm.ttat",
56+
unit="ms",
57+
description="Time from request submission to first answering token (text or tool-call delta) — excludes reasoning chunks",
58+
)
59+
# Note: TPS denominator is the model-generation window
60+
# (last_token_time - first_token_time), not total stream wall time.
61+
# This isolates raw model throughput from event-loop / tool-call latency.
62+
self.tps = meter.create_histogram(
63+
name="agentex.llm.tps",
64+
unit="tokens/s",
65+
description="Output tokens per second over the generation window",
66+
)
67+
self.input_tokens = meter.create_counter(
68+
name="agentex.llm.input_tokens",
69+
unit="tokens",
70+
description="Total input tokens sent to the LLM",
71+
)
72+
self.output_tokens = meter.create_counter(
73+
name="agentex.llm.output_tokens",
74+
unit="tokens",
75+
description="Total output tokens returned by the LLM",
76+
)
77+
self.cached_input_tokens = meter.create_counter(
78+
name="agentex.llm.cached_input_tokens",
79+
unit="tokens",
80+
description="Subset of input tokens served from prompt cache",
81+
)
82+
self.reasoning_tokens = meter.create_counter(
83+
name="agentex.llm.reasoning_tokens",
84+
unit="tokens",
85+
description="Output tokens spent on reasoning (subset of output_tokens)",
86+
)
87+
88+
89+
_llm_metrics: Optional[LLMMetrics] = None
90+
91+
92+
def get_llm_metrics() -> LLMMetrics:
93+
"""Return the LLM metrics singleton, creating it on first use."""
94+
global _llm_metrics
95+
if _llm_metrics is None:
96+
_llm_metrics = LLMMetrics()
97+
return _llm_metrics
98+
99+
100+
def classify_status(exc: Optional[BaseException]) -> str:
101+
"""Categorize an LLM call's outcome into a small fixed set of status labels.
102+
103+
A successful call returns ``"success"``. Exceptions are mapped by type name
104+
so we don't depend on a specific provider SDK's exception class hierarchy:
105+
OpenAI, Anthropic, and other providers all use names like ``RateLimitError``,
106+
``APITimeoutError``, ``InternalServerError``, etc.
107+
"""
108+
if exc is None:
109+
return "success"
110+
name = type(exc).__name__
111+
if "RateLimit" in name:
112+
return "rate_limit"
113+
if "Timeout" in name:
114+
return "timeout"
115+
if any(s in name for s in ("ServerError", "InternalServer", "ServiceUnavailable", "BadGateway")):
116+
return "server_error"
117+
if "Connection" in name:
118+
return "network_error"
119+
if any(s in name for s in ("BadRequest", "Authentication", "Permission", "NotFound", "Conflict", "UnprocessableEntity")):
120+
return "client_error"
121+
return "other_error"
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
"""``RunHooks`` adapter that emits per-call LLM metrics.
2+
3+
Used by the sync ACP path and as a base class for ``TemporalStreamingHooks``
4+
on the async path, so token / request / cache metrics emit consistently
5+
across both. Streaming-only metrics (ttft, ttat, tps) are emitted from the
6+
streaming model itself, not here — hooks don't see individual chunks.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
from typing import Any
12+
from typing_extensions import override
13+
14+
from agents import Agent, RunHooks, ModelResponse, RunContextWrapper
15+
16+
from agentex.lib.core.observability.llm_metrics import classify_status, get_llm_metrics
17+
18+
19+
class LLMMetricsHooks(RunHooks):
20+
"""Emits ``agentex.llm.requests`` + token counters on every LLM call."""
21+
22+
@override
23+
async def on_llm_end(
24+
self,
25+
context: RunContextWrapper[Any],
26+
agent: Agent[Any],
27+
response: ModelResponse,
28+
) -> None:
29+
del context # part of the RunHooks contract; unused here
30+
m = get_llm_metrics()
31+
attrs = {"model": str(agent.model) if agent.model else "unknown"}
32+
# Request counter only depends on agent.model, so emit it first and
33+
# outside the usage-extraction try block. Token counters reach into
34+
# nested optional fields and are best-effort: a non-OpenAI provider
35+
# (litellm-routed Anthropic, etc.) may return a Usage shape missing
36+
# input_tokens_details / output_tokens_details — we emit zeros where
37+
# we can and skip the rest rather than crash the caller.
38+
try:
39+
m.requests.add(1, {**attrs, "status": "success"})
40+
except Exception:
41+
pass
42+
try:
43+
usage = response.usage
44+
m.input_tokens.add(usage.input_tokens or 0, attrs)
45+
m.output_tokens.add(usage.output_tokens or 0, attrs)
46+
m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, attrs)
47+
m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, attrs)
48+
except Exception:
49+
pass
50+
51+
52+
def record_llm_failure(model: str, exc: BaseException) -> None:
53+
"""Best-effort counter bump for an LLM call that raised before ``on_llm_end``."""
54+
try:
55+
get_llm_metrics().requests.add(1, {"model": model, "status": classify_status(exc)})
56+
except Exception:
57+
pass

src/agentex/lib/core/observability/tests/__init__.py

Whitespace-only changes.
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
"""Tests for ``agentex.lib.core.observability.llm_metrics``."""
2+
3+
from __future__ import annotations
4+
5+
import agentex.lib.core.observability.llm_metrics as llm_metrics
6+
from agentex.lib.core.observability.llm_metrics import (
7+
LLMMetrics,
8+
classify_status,
9+
get_llm_metrics,
10+
)
11+
12+
13+
class TestClassifyStatus:
14+
def test_none_is_success(self):
15+
assert classify_status(None) == "success"
16+
17+
def test_rate_limit(self):
18+
class RateLimitError(Exception):
19+
pass
20+
21+
assert classify_status(RateLimitError()) == "rate_limit"
22+
23+
def test_timeout(self):
24+
class APITimeoutError(Exception):
25+
pass
26+
27+
assert classify_status(APITimeoutError()) == "timeout"
28+
29+
def test_server_error(self):
30+
class InternalServerError(Exception):
31+
pass
32+
33+
assert classify_status(InternalServerError()) == "server_error"
34+
35+
class ServiceUnavailable(Exception):
36+
pass
37+
38+
assert classify_status(ServiceUnavailable()) == "server_error"
39+
40+
def test_network_error(self):
41+
class APIConnectionError(Exception):
42+
pass
43+
44+
assert classify_status(APIConnectionError()) == "network_error"
45+
46+
def test_client_error(self):
47+
for cls_name in ("BadRequestError", "AuthenticationError", "PermissionError"):
48+
cls = type(cls_name, (Exception,), {})
49+
assert classify_status(cls()) == "client_error"
50+
51+
def test_unknown_falls_back(self):
52+
class WeirdProviderException(Exception):
53+
pass
54+
55+
assert classify_status(WeirdProviderException()) == "other_error"
56+
57+
58+
class TestGetLLMMetrics:
59+
def test_returns_llm_metrics_instance(self, monkeypatch):
60+
monkeypatch.setattr(llm_metrics, "_llm_metrics", None)
61+
m = get_llm_metrics()
62+
assert isinstance(m, LLMMetrics)
63+
64+
def test_singleton_returns_same_instance(self, monkeypatch):
65+
monkeypatch.setattr(llm_metrics, "_llm_metrics", None)
66+
first = get_llm_metrics()
67+
second = get_llm_metrics()
68+
assert first is second
69+
70+
def test_instruments_exist(self, monkeypatch):
71+
monkeypatch.setattr(llm_metrics, "_llm_metrics", None)
72+
m = get_llm_metrics()
73+
for name in (
74+
"requests",
75+
"ttft_ms",
76+
"ttat_ms",
77+
"tps",
78+
"input_tokens",
79+
"output_tokens",
80+
"cached_input_tokens",
81+
"reasoning_tokens",
82+
):
83+
assert hasattr(m, name), f"missing instrument: {name}"

0 commit comments

Comments
 (0)