Skip to content

fix(streaming): StreamTaskMessageFull closes the coalescing buffer#426

Merged
eberki-scale merged 6 commits into
nextfrom
endre/full-closes-buffer
Jun 26, 2026
Merged

fix(streaming): StreamTaskMessageFull closes the coalescing buffer#426
eberki-scale merged 6 commits into
nextfrom
endre/full-closes-buffer

Conversation

@eberki-scale

@eberki-scale eberki-scale commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

Summary

A StreamTaskMessageFull ends the stream and marks the StreamingTaskMessageContext done, but it did not close the coalescing buffer. When the context later exits, __aexit__ → close() hits its if self._is_closed: return guard and never stops the buffer's background ticker — leaving an orphaned task per such stream. Separately, any deltas still buffered when the Full arrives could be published after the terminal Full, which a consumer treating Full as the final message reads as a stale duplicate tail.

Fix

Two small changes to StreamingTaskMessageContext in agentex/lib/core/services/adk/streaming.py:

  1. stream_update — when a StreamTaskMessageFull arrives, drain and close the buffer before publishing the Full, so leftover deltas land in order (deltas → Full) and the ticker is stopped.
  2. close() — reap the buffer before the _is_closed short-circuit, so a context already marked done (by a Full on another path) can never leave the ticker orphaned.

No change to CoalescingBuffer itself.

Tests

New TestFullMessageClosesBuffer:

  • test_full_message_stops_ticker — after a Full, the buffer is reaped and its ticker task is done() (not orphaned).
  • test_full_is_terminal_publish_no_trailing_deltas — buffered deltas publish before the Full; the Full is the terminal publish.

Full streaming suite: 33 passed (tests/lib/core/services/adk/test_streaming.py).

Scope

Deliberately narrow — just the StreamTaskMessageFull orphan + ordering fix, split out from the broader buffer work in #418 for easier review.

Greptile Summary

  • Updates StreamingTaskMessageContext so a terminal StreamTaskMessageFull drains and closes the coalescing buffer before publishing the full message.
  • Reaps the coalescing buffer before close() returns early for an already-closed context.
  • Adds streaming tests covering ticker cleanup, terminal full-message ordering, and add/close race behavior.

Confidence Score: 4/5

The change fixes the intended buffer cleanup path, but the terminal full-message path still needs serialization with close().

The modified streaming code and focused tests make the intended behavior clear, and the remaining issue is localized to concurrent terminal handling.

src/agentex/lib/core/services/adk/streaming.py

T-Rex T-Rex Logs

What T-Rex did

  • Ran a focused async repro script against the real StreamingTaskMessageContext and StreamTaskMessageFull types, using test doubles for publishing and persistence, which reproduced the terminal update race.
  • Inspected the before/after buffer and ticker state to confirm that the buffer was reaped and the ticker stopped after Full and close/exit.
  • Verified the non-concurrent terminal publish order: Start -> Delta('AB') -> Full('AB'), with trailing deltas after Full cleared and exit code 0.
  • Compared the base and head logs for the non-DONE short-circuit scenario and observed cleanup completed with the buffer closed, the original buffer cleared, and the lifecycle matching the expected behavior.
  • Observed the add-close race path where the racing delta is not appended or flushed after the buffer is closed.

View all artifacts

T-Rex Ran code and verified through T-Rex

Comments Outside Diff (1)

  1. src/agentex/lib/core/services/adk/streaming.py, line 529-530 (link)

    P1 Drain before Done

    StreamTaskMessageDone still publishes the terminal event before the coalescing buffer is drained. In coalesced mode, any buffered deltas remain in self._buffer when stream_update(update) sends Done; only afterwards does close() reap the buffer, so consumers can see Done followed by a delta. Because _is_closed is still false, close() can also emit a second Done. Please drain the buffer before publishing Done and avoid re-emitting it on this path.

    Artifacts

    Repro: async harness that buffers a coalesced delta before explicit Done

    • Contains supporting evidence from the run (text/x-python; charset=utf-8).

    Repro: failing run output showing Done before buffered delta and duplicate Done

    • Keeps the command output available without making the summary code-heavy.

    View artifacts

    T-Rex Ran code and verified through T-Rex

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: src/agentex/lib/core/services/adk/streaming.py
    Line: 529-530
    
    Comment:
    **Drain before Done**
    
    `StreamTaskMessageDone` still publishes the terminal event before the coalescing buffer is drained. In coalesced mode, any buffered deltas remain in `self._buffer` when `stream_update(update)` sends `Done`; only afterwards does `close()` reap the buffer, so consumers can see `Done` followed by a delta. Because `_is_closed` is still false, `close()` can also emit a second `Done`. Please drain the buffer before publishing `Done` and avoid re-emitting it on this path.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Cursor Fix in Claude Code Fix in Codex

Fix All in Cursor Fix All in Claude Code Fix All in Codex

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
src/agentex/lib/core/services/adk/streaming.py:509-512
**Serialize terminal updates**

`StreamTaskMessageFull` drains the buffer before publishing the terminal `Full`, but the context is still not marked closed or closing while that publish is awaiting. If `close()` runs in that window, it sees `_is_closed == False` and publishes/persists `DONE`; when this coroutine resumes, it then publishes the `Full` after `DONE`. Consumers can receive two terminal events in the wrong order, and the persisted message can be updated by two terminal paths. Please guard the `Full` path and `close()` with the same terminal state or a shared lock so only one terminal path can run.

Reviews (7): Last reviewed commit: "refactor(streaming): drop _is_closing co..." | Re-trigger Greptile

Greptile also left 1 inline comment on this PR.

@stainless-app stainless-app Bot force-pushed the next branch 2 times, most recently from 1d86e8a to b30a90b Compare June 23, 2026 22:17
A StreamTaskMessageFull ends the stream and marks the context done, but it did
not close the coalescing buffer. __aexit__'s close() then short-circuits on
_is_closed and never stops the buffer's ticker, leaving an orphaned background
task. Buffered deltas could also publish after the terminal Full, which a
consumer treating Full as final reads as a stale duplicate tail.

Drain and stop the buffer before publishing the Full (deltas -> Full ordering),
and reap the buffer in close() before the _is_closed short-circuit so it can't
be orphaned on any path.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@eberki-scale eberki-scale force-pushed the endre/full-closes-buffer branch from d15800e to a75aa68 Compare June 25, 2026 14:30
Comment thread src/agentex/lib/core/services/adk/streaming.py Outdated
…ng delta

add() checked _closed before taking the lock, so a delta racing a Full-driven
buffer close() could pass the check, then append after the buffer was drained
and its ticker shut down — stranding the delta, never published. Re-check
_closed under the lock before appending.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
# (deltas -> Full) instead of trailing the terminal Full as a stale
# duplicate tail. This also stops the ticker, which would otherwise be
# orphaned when __aexit__'s close() short-circuits on _is_closed.
if isinstance(update, StreamTaskMessageFull) and self._buffer is not None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that Greptile also commented about this and looks like it resolved its own comment but it looks to me like it's still possible that we could have a race here? e.g.

  1. This code block closes the buffer and sets it to none
  2. stream_update (or messages.update) below is being awaited
  3. another delta comes in and skips this block since buffer is already none but _is_closed is not yet set to True

In this case, consumers will still see a delta after full since it just falls through to publishing directly. If this is something we care about (and my logic checks out and you also believe there's a race), we could have a terminal-in-progress flag like _is_closing set before awaiting buffer close to reject deltas?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, started introducing _is_closing, but it qot quite complex. For now, I would keep this PR more focused on the deterministic issue with the buffer leak, and make the more general imrovements in a follow-up PR (most likely #418 once that is rebased)

result = await self._streaming_service.stream_update(update)

if isinstance(update, StreamTaskMessageDone):
await self.close()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i'm reading this correctly, this code block still has a latent bug similar to the one we fixed for the Full case above, right? curious about just abstracting the code above into something like _reap_buffer and calling that here as well for readability and more defensiveness going forward.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done did not have this problem, as the self.close() called here will close the buffer. close() is unfortunately specific to Done, so Full cannot call it directly, but agree that the common _reap_buffer can help. Added!

Comment thread src/agentex/lib/core/services/adk/streaming.py Outdated
Comment thread tests/lib/core/services/adk/test_streaming.py Outdated

@smoreinis smoreinis left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for this fix - overall the PR is solid and I'm fine with it going in as-is.
just a couple of minor nits in the comments that I think could be nice to haves, but happy to leave these at the author discretion.

… deltas

Review follow-ups on the StreamTaskMessageFull fix:
- Extract _reap_buffer() and use it in both close() and the Full branch.
- Add an _is_closing flag set when a terminal (Full/Done) starts processing;
  the delta path drops a late delta that races in after the buffer is reaped
  but before _is_closed is set, so it can't publish after the terminal.
- Clarify the ordering-test comment (batching of the two deltas is irrelevant;
  the invariant is deltas-before-Full).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread src/agentex/lib/core/services/adk/streaming.py
Comment thread src/agentex/lib/core/services/adk/streaming.py Outdated
… failure

Greptile follow-ups on the _is_closing guard:
- close() sets _is_closing before reaping the buffer, so a delta racing a direct
  close() (e.g. via __aexit__) can't slip past the None buffer and publish after DONE.
- A failed terminal write (e.g. messages.update raising) rolls _is_closing back to
  False, so the still-open context doesn't silently drop later deltas.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread src/agentex/lib/core/services/adk/streaming.py Outdated
Comment thread src/agentex/lib/core/services/adk/streaming.py Outdated
eberki-scale and others added 2 commits June 26, 2026 10:09
…ak fix minimal

The _is_closing flag was half-wired (set in stream_update but not close()) and
only mattered under concurrent stream_update calls, which the buffer's
single-producer design doesn't support. Remove it to keep this PR scoped to the
StreamTaskMessageFull ticker-leak fix. Concurrency hardening and Done ordering
are tracked in #418.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread src/agentex/lib/core/services/adk/streaming.py

@eberki-scale eberki-scale left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smoreinis thanks for the review!
commented on the specific comment as well - I think improving robustness with _is_closing is a great improvement, but it got quite complex and wanted to keep this PR focused on the deterministic buffer issue with the StreamTaskMessageFull closer. Concurrency issues could be improved later in the follow up #426 more generally.

result = await self._streaming_service.stream_update(update)

if isinstance(update, StreamTaskMessageDone):
await self.close()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done did not have this problem, as the self.close() called here will close the buffer. close() is unfortunately specific to Done, so Full cannot call it directly, but agree that the common _reap_buffer can help. Added!

# (deltas -> Full) instead of trailing the terminal Full as a stale
# duplicate tail. This also stops the ticker, which would otherwise be
# orphaned when __aexit__'s close() short-circuits on _is_closed.
if isinstance(update, StreamTaskMessageFull) and self._buffer is not None:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, started introducing _is_closing, but it qot quite complex. For now, I would keep this PR more focused on the deterministic issue with the buffer leak, and make the more general imrovements in a follow-up PR (most likely #418 once that is rebased)

@smoreinis

Copy link
Copy Markdown
Contributor

@eberki-scale this all makes sense! LGTM

@eberki-scale eberki-scale merged commit 94ce668 into next Jun 26, 2026
48 checks passed
@eberki-scale eberki-scale deleted the endre/full-closes-buffer branch June 26, 2026 15:43
@stainless-app stainless-app Bot mentioned this pull request Jun 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants