Skip to content
Open
158 changes: 133 additions & 25 deletions src/agents/realtime/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
FunctionToolLookupKey,
get_function_tool_lookup_key_for_tool,
get_function_tool_namespace,
get_tool_trace_name_for_tool,
)
from ..agent import Agent
from ..exceptions import UserError
Expand All @@ -24,6 +25,9 @@
from ..run_context import RunContextWrapper, TContext
from ..tool import DEFAULT_APPROVAL_REJECTION_MESSAGE, FunctionTool, invoke_function_tool
from ..tool_context import ToolContext
from ..tracing import Span, agent_span
from ..tracing.span_data import AgentSpanData
from ..tracing.spans import NoOpSpan
from ..util._approvals import evaluate_needs_approval_setting
from .agent import RealtimeAgent
from .config import RealtimeRunConfig, RealtimeSessionModelSettings, RealtimeUserInput
Expand Down Expand Up @@ -193,6 +197,7 @@ def __init__(
self._guardrail_tasks: set[asyncio.Task[Any]] = set()
self._tool_call_tasks: set[asyncio.Task[Any]] = set()
self._async_tool_calls: bool = bool(self._run_config.get("async_tool_calls", True))
self._current_agent_span: Span[AgentSpanData] | None = None

@property
def model(self) -> RealtimeModel:
Expand All @@ -203,27 +208,59 @@ async def __aenter__(self) -> RealtimeSession:
"""Start the session by connecting to the model. After this, you will be able to stream
events from the model and send messages and audio to the model.
"""
# Add ourselves as a listener
self._model.add_listener(self)
# Create the agent span. Do not install it as the current ContextVar span:
# asyncio tasks inherit a snapshot of their parent's context, so a bg task
# cannot update the main task's context var. Installing the span would leave a
# stale (finished) span as "current" after any handoff that runs in a bg task.
# Agent spans are emitted as children of the enclosing trace without being set
# as current, which is correct and avoids all cross-task ContextVar management.
self._current_agent_span = self._make_agent_span(self._current_agent)
self._current_agent_span.start(mark_as_current=False)

model_config = self._model_config.copy()
model_config["initial_model_settings"] = await self._get_updated_model_settings_from_agent(
starting_settings=self._model_config.get("initial_model_settings", None),
agent=self._current_agent,
)

# Connect to the model
await self._model.connect(model_config)

# Emit initial history update
await self._put_event(
RealtimeHistoryUpdated(
history=self._history,
info=self._event_info,
try:
# Add ourselves as a listener
self._model.add_listener(self)

model_config = self._model_config.copy()
(
initial_settings,
resolved_tools,
enabled_handoffs,
) = await self._get_updated_model_settings_from_agent(
starting_settings=self._model_config.get("initial_model_settings", None),
agent=self._current_agent,
)
model_config["initial_model_settings"] = initial_settings

# Reuse the resolved tools/handoffs returned above — avoids a second call and
# ensures span metadata matches what was actually sent to the model, including
# any overrides applied by starting_settings.
if not isinstance(self._current_agent_span, NoOpSpan):
self._current_agent_span.span_data.tools = [
n for t in resolved_tools if (n := get_tool_trace_name_for_tool(t)) is not None
] or None
self._current_agent_span.span_data.handoffs = [
h.agent_name for h in enabled_handoffs
] or None

# Connect to the model
await self._model.connect(model_config)

# Emit initial history update
await self._put_event(
RealtimeHistoryUpdated(
history=self._history,
info=self._event_info,
)
)
)

return self
return self
except BaseException:
# __aexit__ is not called when __aenter__ raises, so clean up the span here.
if self._current_agent_span is not None:
self._current_agent_span.finish(reset_current=False)
self._current_agent_span = None
raise

async def enter(self) -> RealtimeSession:
"""Enter the async context manager. We strongly recommend using the async context manager
Expand Down Expand Up @@ -278,13 +315,31 @@ async def interrupt(self) -> None:

async def update_agent(self, agent: RealtimeAgent) -> None:
"""Update the active agent for this session and apply its settings to the model."""
self._current_agent = agent
# Finish the outgoing agent span before switching agents, mirroring the handoff path.
if self._current_agent_span is not None:
self._current_agent_span.finish(reset_current=False)

updated_settings = await self._get_updated_model_settings_from_agent(
self._current_agent = agent
self._current_agent_span = self._make_agent_span(self._current_agent)
self._current_agent_span.start(mark_as_current=False)

(
updated_settings,
resolved_tools,
enabled_handoffs,
) = await self._get_updated_model_settings_from_agent(
starting_settings=None,
agent=self._current_agent,
)

if not isinstance(self._current_agent_span, NoOpSpan):
self._current_agent_span.span_data.tools = [
n for t in resolved_tools if (n := get_tool_trace_name_for_tool(t)) is not None
] or None
self._current_agent_span.span_data.handoffs = [
h.agent_name for h in enabled_handoffs
] or None

await self._model.send_event(
RealtimeModelSendSessionUpdate(session_settings=updated_settings)
)
Expand Down Expand Up @@ -815,15 +870,43 @@ async def _handle_tool_call(
# Store previous agent for event
previous_agent = agent

# Finish the span for the outgoing agent. Use reset_current=False because this
# runs inside an asyncio background task; resetting a token from a different
# context raises ValueError.
if self._current_agent_span is not None:
self._current_agent_span.finish(reset_current=False)
Comment on lines +876 to +877
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Clear finished agent span after handoff

With the default async_tool_calls=True, the handoff runs in a background task and this finishes the outgoing agent span while the task that entered the session still has that same span installed as its current span until __aexit__. Any SDK/custom span the application starts after the handoff but before closing the session is then parented under an already-ended previous agent because DefaultTraceProvider.create_span() reads Scope.get_current_span(), so handoff traces can mis-parent user spans to the wrong agent.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 9408465 with an architectural change.

The root cause is that asyncio tasks inherit a snapshot of the parent context — a background task cannot update the main task's ContextVar. So after a handoff runs in a bg task and finishes the outgoing span, the main task's ContextVar still holds the old finished span as current. There is no way to fix this while keeping the pattern of installing agent spans as the ContextVar current span.

The fix: never install agent spans as the ContextVar current span. Scope.set_current_span is removed from __aenter__, _cleanup, and the handoff handler. Agent spans are created, tracked, and finished via self._current_agent_span without touching the context var. Consequences:

  • No stale span: the main task's context is always the enclosing trace root, before and after any handoff.
  • Sibling relationship is now natural: the background task inherits trace-root as current at task-creation time, so the incoming agent span's parent is the trace root without needing the Scope.set_current_span(None) trick.
  • _initial_span_token and all associated cross-task ValueError handling are removed entirely.


# Update current agent
self._current_agent = result

# Get updated model settings from new agent
updated_settings = await self._get_updated_model_settings_from_agent(
# Create the incoming agent span. Because we never install agent spans as
# current (see __aenter__), this background task's context already holds the
# trace root as the current span — provider.create_span() will parent the new
# span to the trace root, making it a sibling of the outgoing agent span.
self._current_agent_span = self._make_agent_span(self._current_agent)
self._current_agent_span.start(mark_as_current=False)
Comment on lines +886 to +887
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Parent handoff spans to the active trace, not the old agent

After a handoff, the outgoing agent span is finished with reset_current=False, so the task's current span is still the outgoing agent when this new span is created. Because _make_agent_span() relies on the ambient current span, every incoming agent span is recorded as a child of the ended previous agent rather than as the next agent span under the trace/task like the regular runner does after handoffs, which makes multi-agent realtime traces misleading.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit b61f80f. Before calling _make_agent_span() for the incoming agent, the handoff handler now temporarily sets the current span to None via Scope.set_current_span(None) and immediately restores it. This ensures provider.create_span() sees no current span and assigns parent_id=None, making the new agent span a sibling under the trace root rather than a child of the finished outgoing span.


# Get updated model settings from new agent; reuse resolved tools and
# handoffs for span metadata to avoid a redundant second call.
(
updated_settings,
resolved_tools,
enabled_handoffs,
) = await self._get_updated_model_settings_from_agent(
starting_settings=None,
agent=self._current_agent,
)

if not isinstance(self._current_agent_span, NoOpSpan):
self._current_agent_span.span_data.tools = [
n
for t in resolved_tools
if (n := get_tool_trace_name_for_tool(t)) is not None
] or None
self._current_agent_span.span_data.handoffs = [
h.agent_name for h in enabled_handoffs
] or None

# Send handoff event
await self._put_event(
RealtimeHandoffEvent(
Expand Down Expand Up @@ -1235,6 +1318,11 @@ async def _cleanup(self) -> None:
self._wake_event_iterators()
return

# Finish the active agent span.
if self._current_agent_span is not None:
self._current_agent_span.finish(reset_current=False)
Comment on lines +1322 to +1323
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reset the initial agent span on cleanup

Fresh evidence is that the initial session span is still started with mark_as_current=True, while cleanup now always calls finish(reset_current=False). After any successfully entered realtime session closes without a handoff, the ended agent span remains in the current context; later spans in the same task and trace, or in a subsequent trace started in that task, will be parented under this stale span (even across trace IDs) instead of the active trace/root. The cross-task handoff case needs care, but the initial span token created by __aenter__ should still be reset when cleanup runs in that same context.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit ceb50e0. aenter now calls span.start(mark_as_current=False) then Scope.set_current_span() explicitly, storing the context-var token in self._initial_span_token. _cleanup (always in the same task as aenter via aexit) calls Scope.reset_current_span() on that token after finishing the current span, so the stale span is no longer current in the context regardless of whether a handoff occurred.

self._current_agent_span = None

# Cancel and cleanup guardrail tasks
self._cleanup_guardrail_tasks()
self._cleanup_tool_call_tasks()
Expand All @@ -1253,11 +1341,28 @@ async def _cleanup(self) -> None:
self._closed = True
self._wake_event_iterators()

def _make_agent_span(self, agent: RealtimeAgent) -> Span[AgentSpanData]:
"""Create a new agent span for the given agent, respecting tracing_disabled.

Tool and handoff names are intentionally omitted here. Callers must populate
span_data.tools and span_data.handoffs from the tuple returned by
_get_updated_model_settings_from_agent() so that metadata reflects what was
actually sent to the model (after is_enabled filtering and any model_config overrides).
"""
disabled: bool = bool(self._run_config.get("tracing_disabled", False))
return agent_span(name=agent.name, disabled=disabled)

async def _get_updated_model_settings_from_agent(
self,
starting_settings: RealtimeSessionModelSettings | None,
agent: RealtimeAgent,
) -> RealtimeSessionModelSettings:
) -> tuple[RealtimeSessionModelSettings, list[Any], list[Any]]:
"""Return (settings, final_tools, final_handoffs).

final_tools and final_handoffs reflect the values in the returned settings after
starting_settings overrides are applied. Callers must use these for span metadata
to ensure the span reports exactly what was sent to the model.
"""
# Start with the merged base settings from run and model configuration.
updated_settings = self._base_model_settings.copy()

Expand All @@ -1273,15 +1378,18 @@ async def _get_updated_model_settings_from_agent(
updated_settings["tools"] = tools or []
updated_settings["handoffs"] = handoffs or []

# Apply starting settings (from model config) next
# Apply starting_settings (from model config) — may override tools and handoffs.
if starting_settings:
updated_settings.update(starting_settings)

disable_tracing = self._run_config.get("tracing_disabled", False)
if disable_tracing:
updated_settings["tracing"] = None

return updated_settings
# Return the final tools/handoffs AFTER overrides so span metadata matches the model.
final_tools = list(updated_settings.get("tools") or [])
final_handoffs = list(updated_settings.get("handoffs") or [])
return updated_settings, final_tools, final_handoffs

@classmethod
async def _get_handoffs(
Expand Down
16 changes: 12 additions & 4 deletions tests/realtime/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,11 @@ async def close(self):
@pytest.fixture
def mock_agent():
agent = Mock(spec=RealtimeAgent)
agent.name = "mock_agent"
agent.get_all_tools = AsyncMock(return_value=[])

type(agent).handoffs = PropertyMock(return_value=[])
type(agent).tools = PropertyMock(return_value=[])
type(agent).output_guardrails = PropertyMock(return_value=[])
return agent

Expand Down Expand Up @@ -2463,9 +2465,11 @@ async def test_session_gets_model_settings_from_agent_during_connection(self):

# Create agent with specific settings
agent = Mock(spec=RealtimeAgent)
agent.name = "test_agent"
agent.get_system_prompt = AsyncMock(return_value="Test agent instructions")
agent.get_all_tools = AsyncMock(return_value=[{"type": "function", "name": "test_tool"}])
agent.handoffs = []
agent.tools = []

session = RealtimeSession(mock_model, agent, None)

Expand All @@ -2492,9 +2496,11 @@ async def test_model_config_overrides_model_settings_not_agent(self):
mock_model.add_listener = Mock()

agent = Mock(spec=RealtimeAgent)
agent.name = "test_agent"
agent.get_system_prompt = AsyncMock(return_value="Agent instructions")
agent.get_all_tools = AsyncMock(return_value=[{"type": "function", "name": "agent_tool"}])
agent.handoffs = []
agent.tools = []

# Provide model config with settings
model_config: RealtimeModelConfig = {
Expand Down Expand Up @@ -2530,8 +2536,10 @@ async def test_handoffs_are_included_in_model_settings(self):

# Create agent with handoffs
agent = Mock(spec=RealtimeAgent)
agent.name = "test_agent"
agent.get_system_prompt = AsyncMock(return_value="Agent with handoffs")
agent.get_all_tools = AsyncMock(return_value=[])
agent.tools = []

# Create a mock handoff
handoff_agent = Mock(spec=RealtimeAgent)
Expand Down Expand Up @@ -2619,7 +2627,7 @@ async def mock_get_handoffs(cls, agent, context_wrapper):
m.setattr("agents.realtime.session.RealtimeSession._get_handoffs", mock_get_handoffs)

# Test the method directly
model_settings = await session._get_updated_model_settings_from_agent(
model_settings, _, _ = await session._get_updated_model_settings_from_agent(
starting_settings=model_config_initial_settings, agent=agent
)

Expand Down Expand Up @@ -2669,7 +2677,7 @@ async def mock_get_handoffs(cls, agent, context_wrapper):
with pytest.MonkeyPatch().context() as m:
m.setattr("agents.realtime.session.RealtimeSession._get_handoffs", mock_get_handoffs)

model_settings = await session._get_updated_model_settings_from_agent(
model_settings, _, _ = await session._get_updated_model_settings_from_agent(
starting_settings=None, # No initial settings
agent=agent,
)
Expand Down Expand Up @@ -2715,7 +2723,7 @@ async def mock_get_handoffs(cls, agent, context_wrapper):
with pytest.MonkeyPatch().context() as m:
m.setattr("agents.realtime.session.RealtimeSession._get_handoffs", mock_get_handoffs)

model_settings = await session._get_updated_model_settings_from_agent(
model_settings, _, _ = await session._get_updated_model_settings_from_agent(
starting_settings=model_config_settings, agent=agent
)

Expand Down Expand Up @@ -2762,7 +2770,7 @@ async def mock_get_handoffs(cls, agent, context_wrapper):
mock_get_handoffs,
)

model_settings = await session._get_updated_model_settings_from_agent(
model_settings, _, _ = await session._get_updated_model_settings_from_agent(
starting_settings=None,
agent=agent,
)
Expand Down
2 changes: 2 additions & 0 deletions tests/realtime/test_session_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ async def interrupt(self) -> None:
def fake_agent():
"""Create a fake agent for testing."""
agent = Mock()
agent.name = "fake_agent"
agent.get_all_tools = AsyncMock(return_value=[])
agent.get_system_prompt = AsyncMock(return_value="test instructions")
agent.handoffs = []
agent.tools = []
return agent


Expand Down
Loading