-
Notifications
You must be signed in to change notification settings - Fork 4.2k
feat: emit agent spans in RealtimeSession for SDK tracing #3573
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a2d639b
ed34935
ceb50e0
b61f80f
0470b2d
9408465
ee9f76b
483f41e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ | |
| FunctionToolLookupKey, | ||
| get_function_tool_lookup_key_for_tool, | ||
| get_function_tool_namespace, | ||
| get_tool_trace_name_for_tool, | ||
| ) | ||
| from ..agent import Agent | ||
| from ..exceptions import UserError | ||
|
|
@@ -24,6 +25,9 @@ | |
| from ..run_context import RunContextWrapper, TContext | ||
| from ..tool import DEFAULT_APPROVAL_REJECTION_MESSAGE, FunctionTool, invoke_function_tool | ||
| from ..tool_context import ToolContext | ||
| from ..tracing import Span, agent_span | ||
| from ..tracing.span_data import AgentSpanData | ||
| from ..tracing.spans import NoOpSpan | ||
| from ..util._approvals import evaluate_needs_approval_setting | ||
| from .agent import RealtimeAgent | ||
| from .config import RealtimeRunConfig, RealtimeSessionModelSettings, RealtimeUserInput | ||
|
|
@@ -193,6 +197,7 @@ def __init__( | |
| self._guardrail_tasks: set[asyncio.Task[Any]] = set() | ||
| self._tool_call_tasks: set[asyncio.Task[Any]] = set() | ||
| self._async_tool_calls: bool = bool(self._run_config.get("async_tool_calls", True)) | ||
| self._current_agent_span: Span[AgentSpanData] | None = None | ||
|
|
||
| @property | ||
| def model(self) -> RealtimeModel: | ||
|
|
@@ -203,27 +208,59 @@ async def __aenter__(self) -> RealtimeSession: | |
| """Start the session by connecting to the model. After this, you will be able to stream | ||
| events from the model and send messages and audio to the model. | ||
| """ | ||
| # Add ourselves as a listener | ||
| self._model.add_listener(self) | ||
| # Create the agent span. Do not install it as the current ContextVar span: | ||
| # asyncio tasks inherit a snapshot of their parent's context, so a bg task | ||
| # cannot update the main task's context var. Installing the span would leave a | ||
| # stale (finished) span as "current" after any handoff that runs in a bg task. | ||
| # Agent spans are emitted as children of the enclosing trace without being set | ||
| # as current, which is correct and avoids all cross-task ContextVar management. | ||
| self._current_agent_span = self._make_agent_span(self._current_agent) | ||
| self._current_agent_span.start(mark_as_current=False) | ||
|
|
||
| model_config = self._model_config.copy() | ||
| model_config["initial_model_settings"] = await self._get_updated_model_settings_from_agent( | ||
| starting_settings=self._model_config.get("initial_model_settings", None), | ||
| agent=self._current_agent, | ||
| ) | ||
|
|
||
| # Connect to the model | ||
| await self._model.connect(model_config) | ||
|
|
||
| # Emit initial history update | ||
| await self._put_event( | ||
| RealtimeHistoryUpdated( | ||
| history=self._history, | ||
| info=self._event_info, | ||
| try: | ||
| # Add ourselves as a listener | ||
| self._model.add_listener(self) | ||
|
|
||
| model_config = self._model_config.copy() | ||
| ( | ||
| initial_settings, | ||
| resolved_tools, | ||
| enabled_handoffs, | ||
| ) = await self._get_updated_model_settings_from_agent( | ||
| starting_settings=self._model_config.get("initial_model_settings", None), | ||
| agent=self._current_agent, | ||
| ) | ||
| model_config["initial_model_settings"] = initial_settings | ||
|
|
||
| # Reuse the resolved tools/handoffs returned above — avoids a second call and | ||
| # ensures span metadata matches what was actually sent to the model, including | ||
| # any overrides applied by starting_settings. | ||
| if not isinstance(self._current_agent_span, NoOpSpan): | ||
| self._current_agent_span.span_data.tools = [ | ||
| n for t in resolved_tools if (n := get_tool_trace_name_for_tool(t)) is not None | ||
| ] or None | ||
| self._current_agent_span.span_data.handoffs = [ | ||
| h.agent_name for h in enabled_handoffs | ||
| ] or None | ||
|
|
||
| # Connect to the model | ||
| await self._model.connect(model_config) | ||
|
|
||
| # Emit initial history update | ||
| await self._put_event( | ||
| RealtimeHistoryUpdated( | ||
| history=self._history, | ||
| info=self._event_info, | ||
| ) | ||
| ) | ||
| ) | ||
|
|
||
| return self | ||
| return self | ||
| except BaseException: | ||
| # __aexit__ is not called when __aenter__ raises, so clean up the span here. | ||
| if self._current_agent_span is not None: | ||
| self._current_agent_span.finish(reset_current=False) | ||
| self._current_agent_span = None | ||
| raise | ||
|
|
||
| async def enter(self) -> RealtimeSession: | ||
| """Enter the async context manager. We strongly recommend using the async context manager | ||
|
|
@@ -278,13 +315,31 @@ async def interrupt(self) -> None: | |
|
|
||
| async def update_agent(self, agent: RealtimeAgent) -> None: | ||
| """Update the active agent for this session and apply its settings to the model.""" | ||
| self._current_agent = agent | ||
| # Finish the outgoing agent span before switching agents, mirroring the handoff path. | ||
| if self._current_agent_span is not None: | ||
| self._current_agent_span.finish(reset_current=False) | ||
|
|
||
| updated_settings = await self._get_updated_model_settings_from_agent( | ||
| self._current_agent = agent | ||
| self._current_agent_span = self._make_agent_span(self._current_agent) | ||
| self._current_agent_span.start(mark_as_current=False) | ||
|
|
||
| ( | ||
| updated_settings, | ||
| resolved_tools, | ||
| enabled_handoffs, | ||
| ) = await self._get_updated_model_settings_from_agent( | ||
| starting_settings=None, | ||
| agent=self._current_agent, | ||
| ) | ||
|
|
||
| if not isinstance(self._current_agent_span, NoOpSpan): | ||
| self._current_agent_span.span_data.tools = [ | ||
| n for t in resolved_tools if (n := get_tool_trace_name_for_tool(t)) is not None | ||
| ] or None | ||
| self._current_agent_span.span_data.handoffs = [ | ||
| h.agent_name for h in enabled_handoffs | ||
| ] or None | ||
|
|
||
| await self._model.send_event( | ||
| RealtimeModelSendSessionUpdate(session_settings=updated_settings) | ||
| ) | ||
|
|
@@ -815,15 +870,43 @@ async def _handle_tool_call( | |
| # Store previous agent for event | ||
| previous_agent = agent | ||
|
|
||
| # Finish the span for the outgoing agent. Use reset_current=False because this | ||
| # runs inside an asyncio background task; resetting a token from a different | ||
| # context raises ValueError. | ||
| if self._current_agent_span is not None: | ||
| self._current_agent_span.finish(reset_current=False) | ||
|
|
||
| # Update current agent | ||
| self._current_agent = result | ||
|
|
||
| # Get updated model settings from new agent | ||
| updated_settings = await self._get_updated_model_settings_from_agent( | ||
| # Create the incoming agent span. Because we never install agent spans as | ||
| # current (see __aenter__), this background task's context already holds the | ||
| # trace root as the current span — provider.create_span() will parent the new | ||
| # span to the trace root, making it a sibling of the outgoing agent span. | ||
| self._current_agent_span = self._make_agent_span(self._current_agent) | ||
| self._current_agent_span.start(mark_as_current=False) | ||
|
Comment on lines
+886
to
+887
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
After a handoff, the outgoing agent span is finished with Useful? React with 👍 / 👎.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in commit b61f80f. Before calling _make_agent_span() for the incoming agent, the handoff handler now temporarily sets the current span to None via Scope.set_current_span(None) and immediately restores it. This ensures provider.create_span() sees no current span and assigns parent_id=None, making the new agent span a sibling under the trace root rather than a child of the finished outgoing span. |
||
|
|
||
| # Get updated model settings from new agent; reuse resolved tools and | ||
| # handoffs for span metadata to avoid a redundant second call. | ||
| ( | ||
| updated_settings, | ||
| resolved_tools, | ||
| enabled_handoffs, | ||
| ) = await self._get_updated_model_settings_from_agent( | ||
| starting_settings=None, | ||
| agent=self._current_agent, | ||
| ) | ||
|
|
||
| if not isinstance(self._current_agent_span, NoOpSpan): | ||
| self._current_agent_span.span_data.tools = [ | ||
| n | ||
| for t in resolved_tools | ||
| if (n := get_tool_trace_name_for_tool(t)) is not None | ||
| ] or None | ||
| self._current_agent_span.span_data.handoffs = [ | ||
| h.agent_name for h in enabled_handoffs | ||
| ] or None | ||
|
|
||
| # Send handoff event | ||
| await self._put_event( | ||
| RealtimeHandoffEvent( | ||
|
|
@@ -1235,6 +1318,11 @@ async def _cleanup(self) -> None: | |
| self._wake_event_iterators() | ||
| return | ||
|
|
||
| # Finish the active agent span. | ||
| if self._current_agent_span is not None: | ||
| self._current_agent_span.finish(reset_current=False) | ||
|
Comment on lines
+1322
to
+1323
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Fresh evidence is that the initial session span is still started with Useful? React with 👍 / 👎.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in commit ceb50e0. aenter now calls span.start(mark_as_current=False) then Scope.set_current_span() explicitly, storing the context-var token in self._initial_span_token. _cleanup (always in the same task as aenter via aexit) calls Scope.reset_current_span() on that token after finishing the current span, so the stale span is no longer current in the context regardless of whether a handoff occurred. |
||
| self._current_agent_span = None | ||
|
|
||
| # Cancel and cleanup guardrail tasks | ||
| self._cleanup_guardrail_tasks() | ||
| self._cleanup_tool_call_tasks() | ||
|
|
@@ -1253,11 +1341,28 @@ async def _cleanup(self) -> None: | |
| self._closed = True | ||
| self._wake_event_iterators() | ||
|
|
||
| def _make_agent_span(self, agent: RealtimeAgent) -> Span[AgentSpanData]: | ||
| """Create a new agent span for the given agent, respecting tracing_disabled. | ||
|
|
||
| Tool and handoff names are intentionally omitted here. Callers must populate | ||
| span_data.tools and span_data.handoffs from the tuple returned by | ||
| _get_updated_model_settings_from_agent() so that metadata reflects what was | ||
| actually sent to the model (after is_enabled filtering and any model_config overrides). | ||
| """ | ||
| disabled: bool = bool(self._run_config.get("tracing_disabled", False)) | ||
| return agent_span(name=agent.name, disabled=disabled) | ||
|
|
||
| async def _get_updated_model_settings_from_agent( | ||
| self, | ||
| starting_settings: RealtimeSessionModelSettings | None, | ||
| agent: RealtimeAgent, | ||
| ) -> RealtimeSessionModelSettings: | ||
| ) -> tuple[RealtimeSessionModelSettings, list[Any], list[Any]]: | ||
| """Return (settings, final_tools, final_handoffs). | ||
|
|
||
| final_tools and final_handoffs reflect the values in the returned settings after | ||
| starting_settings overrides are applied. Callers must use these for span metadata | ||
| to ensure the span reports exactly what was sent to the model. | ||
| """ | ||
| # Start with the merged base settings from run and model configuration. | ||
| updated_settings = self._base_model_settings.copy() | ||
|
|
||
|
|
@@ -1273,15 +1378,18 @@ async def _get_updated_model_settings_from_agent( | |
| updated_settings["tools"] = tools or [] | ||
| updated_settings["handoffs"] = handoffs or [] | ||
|
|
||
| # Apply starting settings (from model config) next | ||
| # Apply starting_settings (from model config) — may override tools and handoffs. | ||
| if starting_settings: | ||
| updated_settings.update(starting_settings) | ||
|
|
||
| disable_tracing = self._run_config.get("tracing_disabled", False) | ||
| if disable_tracing: | ||
| updated_settings["tracing"] = None | ||
|
|
||
| return updated_settings | ||
| # Return the final tools/handoffs AFTER overrides so span metadata matches the model. | ||
| final_tools = list(updated_settings.get("tools") or []) | ||
| final_handoffs = list(updated_settings.get("handoffs") or []) | ||
| return updated_settings, final_tools, final_handoffs | ||
|
|
||
| @classmethod | ||
| async def _get_handoffs( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the default
async_tool_calls=True, the handoff runs in a background task and this finishes the outgoing agent span while the task that entered the session still has that same span installed as its current span until__aexit__. Any SDK/custom span the application starts after the handoff but before closing the session is then parented under an already-ended previous agent becauseDefaultTraceProvider.create_span()readsScope.get_current_span(), so handoff traces can mis-parent user spans to the wrong agent.Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 9408465 with an architectural change.
The root cause is that asyncio tasks inherit a snapshot of the parent context — a background task cannot update the main task's ContextVar. So after a handoff runs in a bg task and finishes the outgoing span, the main task's ContextVar still holds the old finished span as current. There is no way to fix this while keeping the pattern of installing agent spans as the ContextVar current span.
The fix: never install agent spans as the ContextVar current span.
Scope.set_current_spanis removed from__aenter__,_cleanup, and the handoff handler. Agent spans are created, tracked, and finished viaself._current_agent_spanwithout touching the context var. Consequences:Scope.set_current_span(None)trick._initial_span_tokenand all associated cross-task ValueError handling are removed entirely.