Skip to content

Commit a75aa68

Browse files
eberki-scaleclaude
andcommitted
fix(streaming): StreamTaskMessageFull closes the coalescing buffer
A StreamTaskMessageFull ends the stream and marks the context done, but it did not close the coalescing buffer. __aexit__'s close() then short-circuits on _is_closed and never stops the buffer's ticker, leaving an orphaned background task. Buffered deltas could also publish after the terminal Full, which a consumer treating Full as final reads as a stale duplicate tail. Drain and stop the buffer before publishing the Full (deltas -> Full ordering), and reap the buffer in close() before the _is_closed short-circuit so it can't be orphaned on any path. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 521c60d commit a75aa68

2 files changed

Lines changed: 74 additions & 6 deletions

File tree

src/agentex/lib/core/services/adk/streaming.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -420,15 +420,17 @@ async def close(self) -> TaskMessage:
420420
if not self.task_message:
421421
raise ValueError("Context not properly initialized - no task message")
422422

423-
if self._is_closed:
424-
return self.task_message # Already done
425-
426-
# Drain any buffered deltas before announcing DONE so consumers see the
427-
# full sequence in order.
423+
# Reap the buffer (stopping its ticker) before the _is_closed
424+
# short-circuit, so a context already marked done by a Full update can't
425+
# leave the ticker orphaned. Draining here also lets consumers see the
426+
# full delta sequence in order before DONE.
428427
if self._buffer is not None:
429428
await self._buffer.close()
430429
self._buffer = None
431430

431+
if self._is_closed:
432+
return self.task_message # Already done (buffer reaped above)
433+
432434
# Send the DONE event
433435
done_event = StreamTaskMessageDone(
434436
parent_task_message=self.task_message,
@@ -486,6 +488,15 @@ async def stream_update(self, update: TaskMessageUpdate) -> TaskMessageUpdate |
486488
await self._buffer.add(update)
487489
return update
488490

491+
# A Full ends the stream and supersedes buffered deltas. Drain and stop
492+
# the buffer BEFORE publishing the Full, so leftover deltas land in order
493+
# (deltas -> Full) instead of trailing the terminal Full as a stale
494+
# duplicate tail. This also stops the ticker, which would otherwise be
495+
# orphaned when __aexit__'s close() short-circuits on _is_closed.
496+
if isinstance(update, StreamTaskMessageFull) and self._buffer is not None:
497+
await self._buffer.close()
498+
self._buffer = None
499+
489500
result = await self._streaming_service.stream_update(update)
490501

491502
if isinstance(update, StreamTaskMessageDone):

tests/lib/core/services/adk/test_streaming.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
ToolResponseDelta,
2323
ReasoningSummaryDelta,
2424
)
25-
from agentex.types.task_message_update import StreamTaskMessageDelta
25+
from agentex.types.task_message_update import (
26+
StreamTaskMessageFull,
27+
StreamTaskMessageDelta,
28+
)
2629
from agentex.lib.core.services.adk.streaming import (
2730
CoalescingBuffer,
2831
StreamingTaskMessageContext,
@@ -520,3 +523,57 @@ async def test_open_without_created_at_passes_omit(self) -> None:
520523

521524
kwargs = client.messages.create.call_args.kwargs
522525
assert kwargs["created_at"] is omit
526+
527+
528+
class TestFullMessageClosesBuffer:
529+
"""A StreamTaskMessageFull must stop the buffer ticker and drain its deltas
530+
before the terminal Full. Marking the context done without closing the
531+
buffer leaves close()'s _is_closed short-circuit to orphan the ticker, and
532+
publishing buffered deltas after the Full reads as a stale duplicate tail."""
533+
534+
@pytest.mark.asyncio
535+
async def test_full_message_stops_ticker(self) -> None:
536+
ctx, _svc, tm = await _make_context("coalesced")
537+
# A delta makes the buffer and its ticker live.
538+
await ctx.stream_update(_text(tm, "hello"))
539+
buf = ctx._buffer
540+
assert buf is not None
541+
task = buf._task
542+
assert task is not None and not task.done()
543+
544+
await ctx.stream_update(
545+
StreamTaskMessageFull(
546+
parent_task_message=tm,
547+
content=TextContent(author="agent", content="final", format="markdown"),
548+
type="full",
549+
)
550+
)
551+
552+
assert ctx._buffer is None, "Full message left the buffer un-closed"
553+
assert task.done(), "coalescing-buffer ticker still running after Full (orphaned)"
554+
555+
@pytest.mark.asyncio
556+
async def test_full_is_terminal_publish_no_trailing_deltas(self) -> None:
557+
# Buffered deltas must publish BEFORE the Full, never after (a trailing
558+
# delta after the terminal Full reads as a stale duplicate tail).
559+
ctx, svc, tm = await _make_context("coalesced")
560+
# "alpha" flushes immediately; "beta" stays buffered in the window.
561+
await ctx.stream_update(_text(tm, "alpha"))
562+
await ctx.stream_update(_text(tm, "beta"))
563+
564+
full = StreamTaskMessageFull(
565+
parent_task_message=tm,
566+
content=TextContent(author="agent", content="alphabeta", format="markdown"),
567+
type="full",
568+
)
569+
await ctx.stream_update(full)
570+
571+
published = [c.args[0] for c in svc.stream_update.await_args_list]
572+
assert published, "nothing was published"
573+
assert published[-1] is full, (
574+
f"Full must be the terminal publish; saw trailing "
575+
f"{type(published[-1]).__name__} after it (stale duplicate tail)"
576+
)
577+
assert any(isinstance(u, StreamTaskMessageDelta) for u in published[:-1]), (
578+
"expected the buffered deltas to be published before the Full"
579+
)

0 commit comments

Comments
 (0)