Skip to content

Commit 8ecce16

Browse files
jsonbaileyclaude
andcommitted
feat: Update OpenAI graph runner to return AgentGraphRunnerResult with GraphMetrics
Remove all direct LaunchDarkly tracker calls from OpenAIAgentGraphRunner. The runner now collects per-node metrics via _NodeMetricsAccumulator (a lightweight accumulator replacing the per-node LDAIConfigTracker) and returns AgentGraphRunnerResult with populated GraphMetrics (path, duration_ms, usage, node_metrics). Graph-level and per-node tracking events are emitted by ManagedAgentGraph._flush_graph_tracking() from the result. ManagedAgentGraph._flush_graph_tracking() is extended to also drive per-node tracking from result.metrics.node_metrics using the graph definition's node tracker factories. Integration tests in test_tracking_openai_agents.py are updated to run through the full ManagedAgentGraph pipeline (ManagedAgentGraph.run()) so tracking events are emitted by the managed layer as intended. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 14cfa92 commit 8ecce16

4 files changed

Lines changed: 168 additions & 124 deletions

File tree

packages/ai-providers/server-ai-openai/src/ldai_openai/openai_agent_graph_runner.py

Lines changed: 83 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
from ldai import log
66
from ldai.agent_graph import AgentGraphDefinition, AgentGraphNode
7-
from ldai.providers import AgentGraphResult, AgentGraphRunner, ToolRegistry
8-
from ldai.providers.types import LDAIMetrics
7+
from ldai.providers import AgentGraphRunner, ToolRegistry
8+
from ldai.providers.types import AgentGraphRunnerResult, GraphMetrics, LDAIMetrics
99
from ldai.tracker import TokenUsage
1010

1111
from ldai_openai.openai_helper import (
@@ -22,6 +22,34 @@ def _sanitize_agent_name(key: str) -> str:
2222
return re.sub(r'[^a-zA-Z0-9_]', '_', key)
2323

2424

25+
class _NodeMetricsAccumulator:
26+
"""Mutable per-node metrics collected during a run (replaces LDAIConfigTracker)."""
27+
28+
def __init__(self) -> None:
29+
self.usage: Optional[TokenUsage] = None
30+
self.duration_ms: Optional[int] = None
31+
self.tool_calls: List[str] = []
32+
self.success: bool = True
33+
34+
def set_usage(self, usage: Optional[TokenUsage]) -> None:
35+
if usage is not None:
36+
self.usage = usage
37+
38+
def set_duration_ms(self, duration_ms: int) -> None:
39+
self.duration_ms = duration_ms
40+
41+
def add_tool_call(self, tool_name: str) -> None:
42+
self.tool_calls.append(tool_name)
43+
44+
def to_ldai_metrics(self) -> LDAIMetrics:
45+
return LDAIMetrics(
46+
success=self.success,
47+
usage=self.usage,
48+
duration_ms=self.duration_ms,
49+
tool_calls=self.tool_calls if self.tool_calls else None,
50+
)
51+
52+
2553
class _RunState:
2654
"""Mutable state shared across handoff and tool callbacks during a single run."""
2755

@@ -39,9 +67,10 @@ class OpenAIAgentGraphRunner(AgentGraphRunner):
3967
4068
AgentGraphRunner implementation for the OpenAI Agents SDK.
4169
42-
Runs the agent graph with the OpenAI Agents SDK and automatically records
43-
graph- and node-level AI metric data to the LaunchDarkly trackers on the
44-
graph definition and each node.
70+
Runs the agent graph with the OpenAI Agents SDK and collects graph- and
71+
node-level metrics. Tracking events are emitted by the managed layer
72+
(:class:`~ldai.ManagedAgentGraph`) from the returned
73+
:class:`~ldai.providers.types.AgentGraphRunnerResult`.
4574
4675
Requires ``openai-agents`` to be installed.
4776
"""
@@ -61,20 +90,19 @@ def __init__(
6190
self._tools = tools
6291
self._agent_name_map: Dict[str, str] = {}
6392
self._tool_name_map: Dict[str, str] = {}
64-
self._node_trackers: Dict[str, Any] = {}
93+
self._node_accumulators: Dict[str, _NodeMetricsAccumulator] = {}
6594

66-
async def run(self, input: Any) -> AgentGraphResult:
95+
async def run(self, input: Any) -> AgentGraphRunnerResult:
6796
"""
6897
Run the agent graph with the given input.
6998
7099
Builds the agent tree via reverse_traverse, then invokes the root
71-
agent with Runner.run(). Tracks path, latency, and invocation
72-
success/failure.
100+
agent with Runner.run(). Collects path, latency, and per-node metrics.
101+
Graph-level tracking events are emitted by the managed layer.
73102
74103
:param input: The string prompt to send to the agent graph
75-
:return: AgentGraphResult with the final output and metrics
104+
:return: AgentGraphRunnerResult with the final content and GraphMetrics
76105
"""
77-
tracker = self._graph.create_tracker()
78106
path: List[str] = []
79107
root_node = self._graph.root()
80108
root_key = root_node.get_key() if root_node else ''
@@ -86,24 +114,29 @@ async def run(self, input: Any) -> AgentGraphResult:
86114
state = _RunState(last_handoff_ns=start_ns, last_node_key=root_key)
87115
try:
88116
from agents import Runner
89-
root_agent = self._build_agents(path, state, tracker)
117+
root_agent = self._build_agents(path, state)
90118
result = await Runner.run(root_agent, input_str)
91119
self._flush_final_segment(state, result)
92-
self._track_tool_calls(result)
120+
self._collect_tool_calls(result)
93121

94-
duration = (time.perf_counter_ns() - start_ns) // 1_000_000
122+
duration_ms = (time.perf_counter_ns() - start_ns) // 1_000_000
95123
token_usage = get_ai_usage_from_response(result)
96124

97-
tracker.track_path(path)
98-
tracker.track_duration(duration)
99-
tracker.track_invocation_success()
100-
if token_usage is not None:
101-
tracker.track_total_tokens(token_usage)
125+
node_metrics = {
126+
key: acc.to_ldai_metrics()
127+
for key, acc in self._node_accumulators.items()
128+
}
102129

103-
return AgentGraphResult(
104-
output=str(result.final_output),
130+
return AgentGraphRunnerResult(
131+
content=str(result.final_output),
105132
raw=result,
106-
metrics=LDAIMetrics(success=True, usage=token_usage),
133+
metrics=GraphMetrics(
134+
success=True,
135+
path=path,
136+
duration_ms=duration_ms,
137+
usage=token_usage,
138+
node_metrics=node_metrics,
139+
),
107140
)
108141
except Exception as exc:
109142
if isinstance(exc, ImportError):
@@ -113,17 +146,19 @@ async def run(self, input: Any) -> AgentGraphResult:
113146
)
114147
else:
115148
log.warning(f'OpenAIAgentGraphRunner run failed: {exc}')
116-
duration = (time.perf_counter_ns() - start_ns) // 1_000_000
117-
tracker.track_duration(duration)
118-
tracker.track_invocation_failure()
119-
return AgentGraphResult(
120-
output='',
149+
duration_ms = (time.perf_counter_ns() - start_ns) // 1_000_000
150+
return AgentGraphRunnerResult(
151+
content='',
121152
raw=None,
122-
metrics=LDAIMetrics(success=False),
153+
metrics=GraphMetrics(
154+
success=False,
155+
path=path,
156+
duration_ms=duration_ms,
157+
),
123158
)
124159

125160
def _build_agents(
126-
self, path: List[str], state: _RunState, tracker: Any
161+
self, path: List[str], state: _RunState
127162
) -> Any:
128163
"""
129164
Build the agent tree from the graph definition via reverse_traverse.
@@ -133,7 +168,6 @@ def _build_agents(
133168
134169
:param path: Mutable list to accumulate the execution path
135170
:param state: Shared run state for tracking handoff timing and last node
136-
:param tracker: Graph-level tracker shared across the entire run
137171
:return: The root Agent instance
138172
"""
139173
try:
@@ -151,12 +185,12 @@ def _build_agents(
151185

152186
name_map: Dict[str, str] = {}
153187
tool_name_map: Dict[str, str] = {}
154-
node_trackers: Dict[str, Any] = {}
188+
node_accumulators: Dict[str, _NodeMetricsAccumulator] = {}
155189

156190
def build_node(node: AgentGraphNode, ctx: dict) -> Any:
157191
node_config = node.get_config()
158-
config_tracker = node_config.create_tracker()
159-
node_trackers[node_config.key] = config_tracker
192+
acc = _NodeMetricsAccumulator()
193+
node_accumulators[node_config.key] = acc
160194
model = node_config.model
161195

162196
if not model:
@@ -177,8 +211,7 @@ def build_node(node: AgentGraphNode, ctx: dict) -> Any:
177211
node_config.key,
178212
target_key,
179213
path,
180-
tracker,
181-
config_tracker,
214+
acc,
182215
state,
183216
),
184217
)
@@ -212,20 +245,19 @@ def build_node(node: AgentGraphNode, ctx: dict) -> Any:
212245
root = self._graph.reverse_traverse(fn=build_node)
213246
self._agent_name_map = name_map
214247
self._tool_name_map = tool_name_map
215-
self._node_trackers = node_trackers
248+
self._node_accumulators = node_accumulators
216249
return root
217250

218251
def _make_on_handoff(
219252
self,
220253
src: str,
221254
tgt: str,
222255
path: List[str],
223-
tracker: Any,
224-
config_tracker: Any,
256+
acc: _NodeMetricsAccumulator,
225257
state: _RunState,
226258
):
227259
def on_handoff(run_ctx: Any) -> None:
228-
self._handle_handoff(run_ctx, src, tgt, path, tracker, config_tracker, state)
260+
self._handle_handoff(run_ctx, src, tgt, path, acc, state)
229261
return on_handoff
230262

231263
def _handle_handoff(
@@ -234,13 +266,11 @@ def _handle_handoff(
234266
src: str,
235267
tgt: str,
236268
path: List[str],
237-
tracker: Any,
238-
config_tracker: Any,
269+
acc: _NodeMetricsAccumulator,
239270
state: _RunState,
240271
) -> None:
241272
path.append(tgt)
242273
state.last_node_key = tgt
243-
tracker.track_handoff_success(src, tgt)
244274

245275
now_ns = time.perf_counter_ns()
246276
duration_ms = (now_ns - state.last_handoff_ns) // 1_000_000
@@ -254,19 +284,15 @@ def _handle_handoff(
254284
except Exception:
255285
pass
256286

257-
if config_tracker is not None:
258-
if usage is not None:
259-
config_tracker.track_tokens(usage)
260-
if duration_ms is not None:
261-
config_tracker.track_duration(int(duration_ms))
262-
config_tracker.track_success()
287+
acc.set_usage(usage)
288+
acc.set_duration_ms(int(duration_ms))
263289

264290
def _flush_final_segment(self, state: _RunState, result: Any) -> None:
265291
"""Record duration/tokens for the last active agent (no handoff after it)."""
266292
if not state.last_node_key:
267293
return
268-
config_tracker = self._node_trackers.get(state.last_node_key)
269-
if config_tracker is None:
294+
acc = self._node_accumulators.get(state.last_node_key)
295+
if acc is None:
270296
return
271297

272298
now_ns = time.perf_counter_ns()
@@ -280,18 +306,16 @@ def _flush_final_segment(self, state: _RunState, result: Any) -> None:
280306
except Exception:
281307
pass
282308

283-
if usage is not None:
284-
config_tracker.track_tokens(usage)
285-
config_tracker.track_duration(int(duration_ms))
286-
config_tracker.track_success()
309+
acc.set_usage(usage)
310+
acc.set_duration_ms(int(duration_ms))
287311

288-
def _track_tool_calls(self, result: Any) -> None:
289-
"""Track all tool calls from the run result, attributed to the node that called them."""
312+
def _collect_tool_calls(self, result: Any) -> None:
313+
"""Collect all tool calls from the run result, attributed to the node that called them."""
290314
for agent_name, tool_fn_name in get_tool_calls_from_run_items(result.new_items):
291315
agent_key = self._agent_name_map.get(agent_name, agent_name)
292316
tool_name = self._tool_name_map.get(tool_fn_name)
293317
if tool_name is None:
294318
continue
295-
config_tracker = self._node_trackers.get(agent_key)
296-
if config_tracker is not None:
297-
config_tracker.track_tool_call(tool_name)
319+
acc = self._node_accumulators.get(agent_key)
320+
if acc is not None:
321+
acc.add_tool_call(tool_name)

packages/ai-providers/server-ai-openai/tests/test_openai_agent_graph_runner.py

Lines changed: 30 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,17 @@
55

66
from ldai.agent_graph import AgentGraphDefinition
77
from ldai.models import AIAgentGraphConfig, AIAgentConfig, Edge, ModelConfig, ProviderConfig
8-
from ldai.providers import AgentGraphResult, ToolRegistry
8+
from ldai.providers import ToolRegistry
9+
from ldai.providers.types import AgentGraphRunnerResult, GraphMetrics
910
from ldai_openai.openai_agent_graph_runner import OpenAIAgentGraphRunner
1011
from ldai_openai.openai_runner_factory import OpenAIRunnerFactory
1112
from ldai.evaluator import Evaluator
1213

1314

1415
def _make_graph(enabled: bool = True) -> AgentGraphDefinition:
1516
"""Build a minimal single-node AgentGraphDefinition for testing."""
16-
node_tracker = MagicMock()
17-
graph_tracker = MagicMock()
18-
node_factory = MagicMock(return_value=node_tracker)
19-
graph_factory = MagicMock(return_value=graph_tracker)
17+
node_factory = MagicMock()
18+
graph_factory = MagicMock()
2019
root_config = AIAgentConfig(
2120
key='root-agent',
2221
enabled=enabled,
@@ -73,41 +72,44 @@ def test_openai_agent_graph_runner_stores_graph_and_tools():
7372

7473
@pytest.mark.asyncio
7574
async def test_openai_agent_graph_runner_run_raises_when_agents_not_installed():
75+
"""Import failure returns AgentGraphRunnerResult with success=False."""
7676
graph = _make_graph()
7777
runner = OpenAIAgentGraphRunner(graph, {})
7878

7979
with patch.dict('sys.modules', {'agents': None}):
80-
# The import inside run() will fail — runner should return failure result
81-
# rather than propagate the ImportError, since it's caught by the except block
8280
result = await runner.run("test input")
83-
assert isinstance(result, AgentGraphResult)
81+
assert isinstance(result, AgentGraphRunnerResult)
8482
assert result.metrics.success is False
8583

8684

8785
@pytest.mark.asyncio
88-
async def test_openai_agent_graph_runner_run_tracks_invocation_failure_on_exception():
86+
async def test_openai_agent_graph_runner_run_failure_returns_metrics():
87+
"""On import failure, returned GraphMetrics has success=False (no tracker needed)."""
8988
graph = _make_graph()
90-
tracker = graph.create_tracker.return_value
9189
runner = OpenAIAgentGraphRunner(graph, {})
9290

9391
with patch.dict('sys.modules', {'agents': None}):
9492
result = await runner.run("fail")
9593

94+
assert isinstance(result, AgentGraphRunnerResult)
9695
assert result.metrics.success is False
97-
tracker.track_invocation_failure.assert_called_once()
98-
tracker.track_duration.assert_called_once()
96+
assert result.metrics.duration_ms is not None
97+
# Runner no longer calls graph tracker — graph.create_tracker should NOT be called
98+
graph.create_tracker.assert_not_called()
9999

100100

101101
@pytest.mark.asyncio
102102
async def test_openai_agent_graph_runner_run_success():
103+
"""Successful run returns AgentGraphRunnerResult with populated GraphMetrics."""
103104
graph = _make_graph()
104-
tracker = graph.create_tracker.return_value
105105

106106
mock_result = MagicMock()
107107
mock_result.final_output = "agent answer"
108-
mock_result.context_wrapper.usage.total_tokens = 0
109-
mock_result.context_wrapper.usage.input_tokens = 0
110-
mock_result.context_wrapper.usage.output_tokens = 0
108+
mock_result.new_items = []
109+
mock_result.context_wrapper.usage.total_tokens = 10
110+
mock_result.context_wrapper.usage.input_tokens = 5
111+
mock_result.context_wrapper.usage.output_tokens = 5
112+
mock_result.context_wrapper.usage.request_usage_entries = []
111113

112114
mock_runner_module = MagicMock()
113115
mock_runner_module.run = AsyncMock(return_value=mock_result)
@@ -135,28 +137,19 @@ async def test_openai_agent_graph_runner_run_success():
135137
runner = OpenAIAgentGraphRunner(graph, {})
136138
result = await runner.run("find restaurants")
137139

138-
assert isinstance(result, AgentGraphResult)
139-
assert result.output == "agent answer"
140+
assert isinstance(result, AgentGraphRunnerResult)
141+
assert result.content == "agent answer"
142+
assert isinstance(result.metrics, GraphMetrics)
140143
assert result.metrics.success is True
141-
tracker.track_invocation_success.assert_called_once()
142-
tracker.track_path.assert_called_once()
143-
tracker.track_duration.assert_called_once()
144+
assert result.metrics.duration_ms is not None
145+
assert 'root-agent' in result.metrics.path
144146

145-
# The runner caches one tracker per node — verify it is the same instance
146-
# returned by create_tracker() and that all tracking calls hit it.
147-
node_factory = graph.get_node('root-agent').get_config().create_tracker
148-
149-
# The runner caches one tracker per node — verify it is the same instance
150-
# returned by create_tracker and that all tracking calls hit it.
151-
cached = runner._node_trackers['root-agent']
152-
assert cached is node_factory.return_value
153-
cached.track_duration.assert_called_once()
154-
cached.track_tokens.assert_called_once()
155-
cached.track_success.assert_called_once()
147+
# Runner no longer creates or calls the graph tracker
148+
graph.create_tracker.assert_not_called()
156149

157-
# Graph-level create_tracker is called exactly once per run (not twice)
158-
# so that handoff callbacks and run() share the same tracker instance.
159-
graph.create_tracker.assert_called_once()
150+
# Runner no longer creates per-node LDAIConfigTracker instances
151+
node_factory = graph.get_node('root-agent').get_config().create_tracker
152+
node_factory.assert_not_called()
160153

161-
# Node-level create_tracker is called exactly once per node.
162-
node_factory.assert_called_once()
154+
# Runner accumulates per-node metrics in _node_accumulators
155+
assert 'root-agent' in runner._node_accumulators

0 commit comments

Comments
 (0)