Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
from .stream_events import (
AgentUpdatedStreamEvent,
RawResponsesStreamEvent,
ReasoningDeltaEvent,
RunItemStreamEvent,
StreamEvent,
)
Expand Down Expand Up @@ -393,6 +394,7 @@ def enable_verbose_stdout_logging():
"RawResponsesStreamEvent",
"RunItemStreamEvent",
"AgentUpdatedStreamEvent",
"ReasoningDeltaEvent",
"StreamEvent",
"FunctionTool",
"FunctionToolResult",
Expand Down
32 changes: 31 additions & 1 deletion src/agents/run_internal/run_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@
from collections.abc import Awaitable, Callable, Mapping
from typing import Any, TypeVar, cast

from openai.types.responses import Response, ResponseCompletedEvent, ResponseOutputItemDoneEvent
from openai.types.responses import Response, ResponseCompletedEvent, ResponseCreatedEvent, ResponseOutputItemDoneEvent
from openai.types.responses.response_reasoning_text_delta_event import (
ResponseReasoningTextDeltaEvent,
)
from openai.types.responses.response_reasoning_summary_text_delta_event import (
ResponseReasoningSummaryTextDeltaEvent,
)
from openai.types.responses.response_output_item import McpCall, McpListTools
from openai.types.responses.response_prompt_param import ResponsePromptParam
from openai.types.responses.response_reasoning_item import ResponseReasoningItem
Expand Down Expand Up @@ -60,6 +66,7 @@
from ..stream_events import (
AgentUpdatedStreamEvent,
RawResponsesStreamEvent,
ReasoningDeltaEvent,
RunItemStreamEvent,
)
from ..tool import FunctionTool, Tool, dispose_resolved_computers
Expand Down Expand Up @@ -1103,6 +1110,8 @@ async def run_single_turn_streamed(
emitted_tool_call_ids: set[str] = set()
emitted_reasoning_item_ids: set[str] = set()
emitted_tool_search_fingerprints: set[str] = set()
# Accumulated reasoning text for ReasoningDeltaEvent snapshot field.
_reasoning_snapshot: str = ""
# Precompute the lookup map used for streaming descriptions. Function tools use the same
# collision-free lookup keys as runtime dispatch, including deferred top-level aliases.
tool_map: dict[NamedToolLookupKey, Any] = cast(
Expand Down Expand Up @@ -1286,6 +1295,27 @@ async def rewind_model_request() -> None:
async for event in retry_stream:
streamed_result._event_queue.put_nowait(RawResponsesStreamEvent(data=event))

# Reset the reasoning snapshot at the start of each new response attempt
# (e.g., after a retry) so the snapshot field never contains stale text
# from a failed previous attempt.
if isinstance(event, ResponseCreatedEvent):
_reasoning_snapshot = ""

# Emit a ReasoningDeltaEvent for reasoning/thinking deltas so consumers don't have
# to unwrap the raw event themselves.
if isinstance(event, ResponseReasoningSummaryTextDeltaEvent):
delta_text: str = event.delta or ""
_reasoning_snapshot += delta_text
streamed_result._event_queue.put_nowait(
ReasoningDeltaEvent(delta=delta_text, snapshot=_reasoning_snapshot)
)
elif isinstance(event, ResponseReasoningTextDeltaEvent):
delta_text = event.delta or ""
_reasoning_snapshot += delta_text
streamed_result._event_queue.put_nowait(
ReasoningDeltaEvent(delta=delta_text, snapshot=_reasoning_snapshot)
)

terminal_response: Response | None = None
if isinstance(event, ResponseCompletedEvent):
terminal_response = event.response
Expand Down
27 changes: 26 additions & 1 deletion src/agents/stream_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,30 @@ class AgentUpdatedStreamEvent:
type: Literal["agent_updated_stream_event"] = "agent_updated_stream_event"


StreamEvent: TypeAlias = Union[RawResponsesStreamEvent, RunItemStreamEvent, AgentUpdatedStreamEvent]
@dataclass
class ReasoningDeltaEvent:
"""Emitted when a reasoning/thinking delta is received from the model during streaming.

This is a convenience wrapper over the low-level
``response.reasoning_summary_text.delta`` and ``response.reasoning_text.delta`` raw
events. Both OpenAI o-series reasoning summaries and third-party
``delta.reasoning`` fields (e.g. DeepSeek-R1 via LiteLLM) are surfaced here.
"""

delta: str
"""The incremental reasoning text fragment."""

snapshot: str
"""The full reasoning text accumulated so far in this turn."""

type: Literal["reasoning_delta"] = "reasoning_delta"
"""The type of the event."""


StreamEvent: TypeAlias = Union[
RawResponsesStreamEvent,
RunItemStreamEvent,
AgentUpdatedStreamEvent,
ReasoningDeltaEvent,
]
"""A streaming event from an agent."""
147 changes: 147 additions & 0 deletions tests/test_reasoning_delta_stream_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""Tests for ReasoningDeltaEvent stream event (issue #825)."""

from __future__ import annotations

import pytest

from agents import Agent, Runner
from agents.stream_events import ReasoningDeltaEvent, RawResponsesStreamEvent

from openai.types.responses.response_reasoning_item import ResponseReasoningItem, Summary

from .fake_model import FakeModel
from .test_responses import get_text_message


def _make_reasoning_item(text: str) -> ResponseReasoningItem:
return ResponseReasoningItem(
id="rs_test",
type="reasoning",
summary=[Summary(text=text, type="summary_text")],
)


@pytest.mark.asyncio
async def test_reasoning_delta_event_emitted_during_streaming() -> None:
"""ReasoningDeltaEvent is emitted when the model streams a reasoning summary delta."""
model = FakeModel()
model.set_next_output([
_make_reasoning_item("Let me think..."),
get_text_message("Answer"),
])

agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")

reasoning_deltas: list[ReasoningDeltaEvent] = []
async for event in result.stream_events():
if isinstance(event, ReasoningDeltaEvent):
reasoning_deltas.append(event)

assert len(reasoning_deltas) >= 1
assert all(isinstance(e.delta, str) for e in reasoning_deltas)
assert all(isinstance(e.snapshot, str) for e in reasoning_deltas)
assert all(e.type == "reasoning_delta" for e in reasoning_deltas)


@pytest.mark.asyncio
async def test_reasoning_delta_snapshot_accumulates() -> None:
"""The snapshot field grows monotonically across delta events."""
model = FakeModel()
model.set_next_output([
_make_reasoning_item("Hello world"),
get_text_message("done"),
])

agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")

snapshots: list[str] = []
async for event in result.stream_events():
if isinstance(event, ReasoningDeltaEvent):
snapshots.append(event.snapshot)

# Each snapshot must be at least as long as the previous one
for i in range(1, len(snapshots)):
assert len(snapshots[i]) >= len(snapshots[i - 1])

# Last snapshot must contain the full reasoning text
if snapshots:
assert "Hello world" in snapshots[-1]


@pytest.mark.asyncio
async def test_no_reasoning_delta_event_without_reasoning() -> None:
"""ReasoningDeltaEvent is not emitted when there is no reasoning in the response."""
model = FakeModel()
model.set_next_output([get_text_message("plain text answer")])

agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")

async for event in result.stream_events():
assert not isinstance(event, ReasoningDeltaEvent), (
"Got unexpected ReasoningDeltaEvent for a plain text response"
)


@pytest.mark.asyncio
async def test_reasoning_delta_event_type_field() -> None:
"""ReasoningDeltaEvent.type is always 'reasoning_delta'."""
model = FakeModel()
model.set_next_output([
_make_reasoning_item("some reasoning"),
get_text_message("answer"),
])

agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")

found = False
async for event in result.stream_events():
if isinstance(event, ReasoningDeltaEvent):
assert event.type == "reasoning_delta"
found = True
break
assert found, "Expected at least one ReasoningDeltaEvent but none were emitted"


@pytest.mark.asyncio
async def test_raw_response_events_still_emitted_alongside_reasoning_delta() -> None:
"""RawResponsesStreamEvent is still emitted even when ReasoningDeltaEvent is also emitted."""
model = FakeModel()
model.set_next_output([
_make_reasoning_item("thinking"),
get_text_message("result"),
])

agent = Agent(name="A", model=model)
result = Runner.run_streamed(agent, input="hi")

raw_events: list[RawResponsesStreamEvent] = []
reasoning_events: list[ReasoningDeltaEvent] = []

async for event in result.stream_events():
if isinstance(event, RawResponsesStreamEvent):
raw_events.append(event)
elif isinstance(event, ReasoningDeltaEvent):
reasoning_events.append(event)

# Both types should be present
assert len(raw_events) > 0
assert len(reasoning_events) > 0


@pytest.mark.asyncio
async def test_reasoning_delta_event_importable_from_agents() -> None:
"""ReasoningDeltaEvent can be imported directly from the agents package."""
from agents import ReasoningDeltaEvent as RDE
assert RDE is ReasoningDeltaEvent


def test_reasoning_delta_event_dataclass() -> None:
"""ReasoningDeltaEvent is a proper dataclass with expected fields."""
event = ReasoningDeltaEvent(delta="chunk", snapshot="full chunk")
assert event.delta == "chunk"
assert event.snapshot == "full chunk"
assert event.type == "reasoning_delta"
Loading