Python: handle streamed A2A update events#4919
Python: handle streamed A2A update events#4919sztoplover-bit wants to merge 2 commits intomicrosoft:mainfrom
Conversation
Python Test Coverage Report •
Python Unit Test Overview
|
||||||||||||||||||||||||||||||
giles17
left a comment
There was a problem hiding this comment.
Automated Code Review
Reviewers: 4 | Confidence: 82%
✓ Correctness
This PR correctly fixes the issue where streaming
TaskArtifactUpdateEventevents were silently discarded in_map_a2a_stream. The new_updates_from_task_update_eventmethod properly extracts content from bothTaskArtifactUpdateEvent(usingartifact.parts) andTaskStatusUpdateEvent(usingstatus.message.parts). The deduplication logic in_updates_from_taskprevents terminal tasks from re-emitting artifacts that were already streamed incrementally, which is validated by the second test. The code correctly gates the new streaming behavior behind theemit_intermediateflag, preserving backward compatibility for non-streaming calers likepoll_task. The implementation is sound and the tests cover both the basic streaming case and the deduplication edge case.
✓ Security Reliability
This PR correctly fixes the core bug where TaskArtifactUpdateEvent and TaskStatusUpdateEvent content was silently dropped during streaming. The new _updates_from_task_update_event method properly extracts incremental content from both event types. However, the deduplication logic that unconditionally suppresses terminal task artifact processing (return []) when emit_intermediate=True introduces a potential silent data-loss path: if an A2A server sends artifacts only on the terminal Task object (without preceding TaskArtifactUpdateEvent chunks), those artifacts are silently discarded because the code bypasses _parse_messages_from_task. The base branch does not have this issue — terminal tasks always fall through to artifact extraction. No security concerns were identified.
✓ Test Coverage
The PR correctly fixes the core bug where
TaskArtifactUpdateEventwas silently discarded during streaming. Two new tests cover the primary artifact streaming path and the deduplication scenario (terminal task artifacts not re-emitted after streaming chunks). However, the new_updates_from_task_update_eventmethod has an untested branch forTaskStatusUpdateEventwith message content, and the deduplication guard (if status.state in TERMINAL_TASK_STATES: return []) could suppress terminal task artifacts when no prior events carried content — an edge case with no test coverage.
✓ Design Approach
The PR correctly fixes the core bug:
TaskArtifactUpdateEventchunks were previously discarded entirely (bound to_update_eventand ignored). The new_updates_from_task_update_eventhelper properly maps artifact update events toAgentResponseUpdateobjects. The de-duplication logic (early-return[]when the terminalTaskStatusUpdateEventhas no message) is intentional and aligns with A2A spec semantics — the final status event is a completion signal, not a content carrier. One genuine design concern remains: the early-return onTERMINAL_TASK_STATESinside theemit_intermediateguard also short-circuits thebackground=Trueterminal-artifact path. Ifemit_intermediate=Trueandbackground=Trueare both active and the terminal event has no message, the terminal task's parsed artifacts are silently dropped — a behavioral regression vs. the prior code where terminal artifacts were always parsed. Additionally,TaskArtifactUpdateEvent.append(replace vs. append semantics) is not surfaced as a first-class field onAgentResponseUpdate, making it impossible for callers to correctly reconstruct content without reaching intoraw_representation.
Automated review by giles17's agents
| """ | ||
| status = task.status | ||
|
|
||
| if emit_intermediate and update_event is not None: | ||
| if event_updates := self._updates_from_task_update_event(update_event): | ||
| return event_updates | ||
| if status.state in TERMINAL_TASK_STATES: |
There was a problem hiding this comment.
Reliability concern: the deduplication guard returns [] for ALL terminal tasks when emit_intermediate=True and update_event is present, regardless of whether any prior artifact events actually delivered content. If no TaskArtifactUpdateEvent chunks were previously streamed (e.g., the server only puts artifacts on the terminal Task), those artifacts are silently lost — a regression from the base branch which always processes terminal task artifacts. This also bypasses the background=True terminal-artifact path. Consider guarding with a check that prior streaming content was yielded, or falling through to _parse_messages_from_task(task) as a safety net when no event content was produced.
| """ | |
| status = task.status | |
| if emit_intermediate and update_event is not None: | |
| if event_updates := self._updates_from_task_update_event(update_event): | |
| return event_updates | |
| if status.state in TERMINAL_TASK_STATES: | |
| if status.state in TERMINAL_TASK_STATES: | |
| # If the event itself carried content, we already returned it above. | |
| # Fall through to _parse_messages_from_task only as a safety net | |
| # when no event content was produced (keeps artifacts for servers | |
| # that don't send TaskArtifactUpdateEvents). | |
| pass |
| ) | ||
| ] | ||
|
|
||
| message = update_event.status.message |
There was a problem hiding this comment.
Minor: this falls through to access update_event.status.message without an explicit isinstance(update_event, TaskStatusUpdateEvent) check. Currently safe because the union has only two types, but an explicit guard would prevent an AttributeError if the A2A SDK adds new event types in the future.
| message = update_event.status.message | |
| if not isinstance(update_event, TaskStatusUpdateEvent): | |
| return [] | |
| message = update_event.status.message |
|
|
||
| message = update_event.status.message | ||
| if message is None or not message.parts: | ||
| return [] | ||
|
|
||
| contents = self._parse_contents_from_a2a(message.parts) | ||
| if not contents: | ||
| return [] | ||
|
|
||
| return [ | ||
| AgentResponseUpdate( | ||
| contents=contents, | ||
| role="assistant" if message.role == A2ARole.agent else "user", | ||
| response_id=update_event.task_id, | ||
| raw_representation=update_event, | ||
| ) |
There was a problem hiding this comment.
This TaskStatusUpdateEvent-with-message branch is not exercised by any test. The existing streaming tests use update_event=None, and the new tests only cover TaskArtifactUpdateEvent. A test sending a (working_task, TaskStatusUpdateEvent(status=TaskStatus(state=working, message=A2AMessage(...)))) tuple would verify role mapping and content extraction through this path.
|
@microsoft-github-policy-service agree |
Summary
TaskArtifactUpdateEventchunks asAgentResponseUpdateitems so streaming callers receive incremental outputVerification
./.venv/bin/pytest python/packages/a2a/tests/test_a2a_agent.py -qTaskArtifactUpdateEventstreaming script now yields['Hello']Fixes #4901