Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ dev = [
"no_implicit_optional",
"trio",
"uvicorn>=0.35.0",
"pytest-timeout>=2.4.0",
"a2a-sdk[all]",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ async def push_notification_callback() -> None:

except Exception:
logger.exception('Agent execution failed')
producer_task.cancel()
raise
finally:
if interrupted_or_non_blocking:
Expand Down Expand Up @@ -435,7 +436,12 @@ async def _cleanup_producer(
task_id: str,
) -> None:
"""Cleans up the agent execution task and queue manager entry."""
await producer_task
try:
await producer_task
except asyncio.CancelledError:
logger.debug(
'Producer task %s was cancelled during cleanup', task_id
)
await self._queue_manager.close(task_id)
async with self._running_agents_lock:
self._running_agents.pop(task_id, None)
Expand Down
51 changes: 51 additions & 0 deletions tests/server/request_handlers/test_default_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2644,3 +2644,54 @@ async def test_on_message_send_stream_task_id_provided_but_task_not_found():
f'Task {task_id} was specified but does not exist'
in exc_info.value.error.message
)


class HelloWorldAgentExecutor(AgentExecutor):
"""Test Agent Implementation."""

async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
updater = TaskUpdater(
event_queue,
task_id=context.task_id or str(uuid.uuid4()),
context_id=context.context_id or str(uuid.uuid4()),
)
await updater.update_status(TaskState.working)
await updater.complete()

async def cancel(
self, context: RequestContext, event_queue: EventQueue
) -> None:
raise NotImplementedError('cancel not supported')


# Repro is straight from the https://github.com/a2aproject/a2a-python/issues/609.
# It uses timeout to test against infinite wait, if it's going to be flaky,
# we should reconsider the approach.
@pytest.mark.asyncio
@pytest.mark.timeout(1)
async def test_on_message_send_error_does_not_hang():
"""Test that if the consumer raises an exception during blocking wait, the producer is cancelled and no deadlock occurs."""
agent = HelloWorldAgentExecutor()
task_store = AsyncMock(spec=TaskStore)
task_store.save.side_effect = RuntimeError('This is an Error!')

request_handler = DefaultRequestHandler(
agent_executor=agent, task_store=task_store
)

params = MessageSendParams(
message=Message(
role=Role.user,
message_id='msg_error_blocking',
parts=[Part(root=TextPart(text='Test message'))],
)
)

with pytest.raises(RuntimeError, match='This is an Error!'):
await request_handler.on_message_send(
params, create_server_call_context()
)
1 change: 0 additions & 1 deletion tests/server/request_handlers/test_jsonrpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ async def streaming_coro():

self.assertIsInstance(response.root, JSONRPCErrorResponse)
assert response.root.error == UnsupportedOperationError() # type: ignore
mock_agent_executor.execute.assert_called_once()

@patch(
'a2a.server.agent_execution.simple_request_context_builder.SimpleRequestContextBuilder.build'
Expand Down
14 changes: 14 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading