fix(streaming): StreamTaskMessageFull closes the coalescing buffer#426
Conversation
1d86e8a to
b30a90b
Compare
A StreamTaskMessageFull ends the stream and marks the context done, but it did not close the coalescing buffer. __aexit__'s close() then short-circuits on _is_closed and never stops the buffer's ticker, leaving an orphaned background task. Buffered deltas could also publish after the terminal Full, which a consumer treating Full as final reads as a stale duplicate tail. Drain and stop the buffer before publishing the Full (deltas -> Full ordering), and reap the buffer in close() before the _is_closed short-circuit so it can't be orphaned on any path. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
d15800e to
a75aa68
Compare
…ng delta add() checked _closed before taking the lock, so a delta racing a Full-driven buffer close() could pass the check, then append after the buffer was drained and its ticker shut down — stranding the delta, never published. Re-check _closed under the lock before appending. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
| # (deltas -> Full) instead of trailing the terminal Full as a stale | ||
| # duplicate tail. This also stops the ticker, which would otherwise be | ||
| # orphaned when __aexit__'s close() short-circuits on _is_closed. | ||
| if isinstance(update, StreamTaskMessageFull) and self._buffer is not None: |
There was a problem hiding this comment.
I saw that Greptile also commented about this and looks like it resolved its own comment but it looks to me like it's still possible that we could have a race here? e.g.
- This code block closes the buffer and sets it to none
- stream_update (or messages.update) below is being awaited
- another delta comes in and skips this block since buffer is already none but _is_closed is not yet set to True
In this case, consumers will still see a delta after full since it just falls through to publishing directly. If this is something we care about (and my logic checks out and you also believe there's a race), we could have a terminal-in-progress flag like _is_closing set before awaiting buffer close to reject deltas?
There was a problem hiding this comment.
Good point, started introducing _is_closing, but it qot quite complex. For now, I would keep this PR more focused on the deterministic issue with the buffer leak, and make the more general imrovements in a follow-up PR (most likely #418 once that is rebased)
| result = await self._streaming_service.stream_update(update) | ||
|
|
||
| if isinstance(update, StreamTaskMessageDone): | ||
| await self.close() |
There was a problem hiding this comment.
if i'm reading this correctly, this code block still has a latent bug similar to the one we fixed for the Full case above, right? curious about just abstracting the code above into something like _reap_buffer and calling that here as well for readability and more defensiveness going forward.
There was a problem hiding this comment.
Done did not have this problem, as the self.close() called here will close the buffer. close() is unfortunately specific to Done, so Full cannot call it directly, but agree that the common _reap_buffer can help. Added!
smoreinis
left a comment
There was a problem hiding this comment.
thanks for this fix - overall the PR is solid and I'm fine with it going in as-is.
just a couple of minor nits in the comments that I think could be nice to haves, but happy to leave these at the author discretion.
… deltas Review follow-ups on the StreamTaskMessageFull fix: - Extract _reap_buffer() and use it in both close() and the Full branch. - Add an _is_closing flag set when a terminal (Full/Done) starts processing; the delta path drops a late delta that races in after the buffer is reaped but before _is_closed is set, so it can't publish after the terminal. - Clarify the ordering-test comment (batching of the two deltas is irrelevant; the invariant is deltas-before-Full). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… failure Greptile follow-ups on the _is_closing guard: - close() sets _is_closing before reaping the buffer, so a delta racing a direct close() (e.g. via __aexit__) can't slip past the None buffer and publish after DONE. - A failed terminal write (e.g. messages.update raising) rolls _is_closing back to False, so the still-open context doesn't silently drop later deltas. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…terminal failure" This reverts commit 87a6ad0.
…ak fix minimal The _is_closing flag was half-wired (set in stream_update but not close()) and only mattered under concurrent stream_update calls, which the buffer's single-producer design doesn't support. Remove it to keep this PR scoped to the StreamTaskMessageFull ticker-leak fix. Concurrency hardening and Done ordering are tracked in #418. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
eberki-scale
left a comment
There was a problem hiding this comment.
@smoreinis thanks for the review!
commented on the specific comment as well - I think improving robustness with _is_closing is a great improvement, but it got quite complex and wanted to keep this PR focused on the deterministic buffer issue with the StreamTaskMessageFull closer. Concurrency issues could be improved later in the follow up #426 more generally.
| result = await self._streaming_service.stream_update(update) | ||
|
|
||
| if isinstance(update, StreamTaskMessageDone): | ||
| await self.close() |
There was a problem hiding this comment.
Done did not have this problem, as the self.close() called here will close the buffer. close() is unfortunately specific to Done, so Full cannot call it directly, but agree that the common _reap_buffer can help. Added!
| # (deltas -> Full) instead of trailing the terminal Full as a stale | ||
| # duplicate tail. This also stops the ticker, which would otherwise be | ||
| # orphaned when __aexit__'s close() short-circuits on _is_closed. | ||
| if isinstance(update, StreamTaskMessageFull) and self._buffer is not None: |
There was a problem hiding this comment.
Good point, started introducing _is_closing, but it qot quite complex. For now, I would keep this PR more focused on the deterministic issue with the buffer leak, and make the more general imrovements in a follow-up PR (most likely #418 once that is rebased)
|
@eberki-scale this all makes sense! LGTM |
Summary
A
StreamTaskMessageFullends the stream and marks theStreamingTaskMessageContextdone, but it did not close the coalescing buffer. When the context later exits,__aexit__ → close()hits itsif self._is_closed: returnguard and never stops the buffer's background ticker — leaving an orphaned task per such stream. Separately, any deltas still buffered when theFullarrives could be published after the terminalFull, which a consumer treatingFullas the final message reads as a stale duplicate tail.Fix
Two small changes to
StreamingTaskMessageContextinagentex/lib/core/services/adk/streaming.py:stream_update— when aStreamTaskMessageFullarrives, drain and close the buffer before publishing theFull, so leftover deltas land in order (deltas → Full) and the ticker is stopped.close()— reap the buffer before the_is_closedshort-circuit, so a context already marked done (by aFullon another path) can never leave the ticker orphaned.No change to
CoalescingBufferitself.Tests
New
TestFullMessageClosesBuffer:test_full_message_stops_ticker— after aFull, the buffer is reaped and its ticker task isdone()(not orphaned).test_full_is_terminal_publish_no_trailing_deltas— buffered deltas publish before theFull; theFullis the terminal publish.Full streaming suite: 33 passed (
tests/lib/core/services/adk/test_streaming.py).Scope
Deliberately narrow — just the
StreamTaskMessageFullorphan + ordering fix, split out from the broader buffer work in #418 for easier review.Greptile Summary
StreamingTaskMessageContextso a terminalStreamTaskMessageFulldrains and closes the coalescing buffer before publishing the full message.close()returns early for an already-closed context.Confidence Score: 4/5
The change fixes the intended buffer cleanup path, but the terminal full-message path still needs serialization with close().
The modified streaming code and focused tests make the intended behavior clear, and the remaining issue is localized to concurrent terminal handling.
src/agentex/lib/core/services/adk/streaming.py
What T-Rex did
Comments Outside Diff (1)
src/agentex/lib/core/services/adk/streaming.py, line 529-530 (link)StreamTaskMessageDonestill publishes the terminal event before the coalescing buffer is drained. In coalesced mode, any buffered deltas remain inself._bufferwhenstream_update(update)sendsDone; only afterwards doesclose()reap the buffer, so consumers can seeDonefollowed by a delta. Because_is_closedis still false,close()can also emit a secondDone. Please drain the buffer before publishingDoneand avoid re-emitting it on this path.Artifacts
Repro: async harness that buffers a coalesced delta before explicit Done
Repro: failing run output showing Done before buffered delta and duplicate Done
Prompt To Fix With AI
Prompt To Fix All With AI
Reviews (7): Last reviewed commit: "refactor(streaming): drop _is_closing co..." | Re-trigger Greptile