diff --git a/pyproject.toml b/pyproject.toml index 496d5d51..1a8f0af6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -105,6 +105,7 @@ dev = [ "no_implicit_optional", "trio", "uvicorn>=0.35.0", + "pytest-timeout>=2.4.0", "a2a-sdk[all]", ] diff --git a/src/a2a/server/request_handlers/default_request_handler.py b/src/a2a/server/request_handlers/default_request_handler.py index 30d1ee89..cb002569 100644 --- a/src/a2a/server/request_handlers/default_request_handler.py +++ b/src/a2a/server/request_handlers/default_request_handler.py @@ -330,6 +330,7 @@ async def push_notification_callback() -> None: except Exception: logger.exception('Agent execution failed') + producer_task.cancel() raise finally: if interrupted_or_non_blocking: @@ -435,7 +436,12 @@ async def _cleanup_producer( task_id: str, ) -> None: """Cleans up the agent execution task and queue manager entry.""" - await producer_task + try: + await producer_task + except asyncio.CancelledError: + logger.debug( + 'Producer task %s was cancelled during cleanup', task_id + ) await self._queue_manager.close(task_id) async with self._running_agents_lock: self._running_agents.pop(task_id, None) diff --git a/tests/server/request_handlers/test_default_request_handler.py b/tests/server/request_handlers/test_default_request_handler.py index 88dd77ab..067b8bb5 100644 --- a/tests/server/request_handlers/test_default_request_handler.py +++ b/tests/server/request_handlers/test_default_request_handler.py @@ -2644,3 +2644,54 @@ async def test_on_message_send_stream_task_id_provided_but_task_not_found(): f'Task {task_id} was specified but does not exist' in exc_info.value.error.message ) + + +class HelloWorldAgentExecutor(AgentExecutor): + """Test Agent Implementation.""" + + async def execute( + self, + context: RequestContext, + event_queue: EventQueue, + ) -> None: + updater = TaskUpdater( + event_queue, + task_id=context.task_id or str(uuid.uuid4()), + context_id=context.context_id or str(uuid.uuid4()), + ) + await updater.update_status(TaskState.working) + await updater.complete() + + async def cancel( + self, context: RequestContext, event_queue: EventQueue + ) -> None: + raise NotImplementedError('cancel not supported') + + +# Repro is straight from the https://github.com/a2aproject/a2a-python/issues/609. +# It uses timeout to test against infinite wait, if it's going to be flaky, +# we should reconsider the approach. +@pytest.mark.asyncio +@pytest.mark.timeout(1) +async def test_on_message_send_error_does_not_hang(): + """Test that if the consumer raises an exception during blocking wait, the producer is cancelled and no deadlock occurs.""" + agent = HelloWorldAgentExecutor() + task_store = AsyncMock(spec=TaskStore) + task_store.save.side_effect = RuntimeError('This is an Error!') + + request_handler = DefaultRequestHandler( + agent_executor=agent, task_store=task_store + ) + + params = MessageSendParams( + message=Message( + role=Role.user, + message_id='msg_error_blocking', + parts=[Part(root=TextPart(text='Test message'))], + ) + ) + + with pytest.raises(RuntimeError, match='This is an Error!'): + await request_handler.on_message_send( + params, create_server_call_context() + ) diff --git a/tests/server/request_handlers/test_jsonrpc_handler.py b/tests/server/request_handlers/test_jsonrpc_handler.py index 4ed6e702..08dfd63f 100644 --- a/tests/server/request_handlers/test_jsonrpc_handler.py +++ b/tests/server/request_handlers/test_jsonrpc_handler.py @@ -322,7 +322,6 @@ async def streaming_coro(): self.assertIsInstance(response.root, JSONRPCErrorResponse) assert response.root.error == UnsupportedOperationError() # type: ignore - mock_agent_executor.execute.assert_called_once() @patch( 'a2a.server.agent_execution.simple_request_context_builder.SimpleRequestContextBuilder.build' diff --git a/uv.lock b/uv.lock index 13a4a919..cb5161ac 100644 --- a/uv.lock +++ b/uv.lock @@ -76,6 +76,7 @@ dev = [ { name = "pytest-asyncio" }, { name = "pytest-cov" }, { name = "pytest-mock" }, + { name = "pytest-timeout" }, { name = "pytest-xdist" }, { name = "pyupgrade" }, { name = "respx" }, @@ -138,6 +139,7 @@ dev = [ { name = "pytest-asyncio", specifier = ">=0.26.0" }, { name = "pytest-cov", specifier = ">=6.1.1" }, { name = "pytest-mock", specifier = ">=3.14.0" }, + { name = "pytest-timeout", specifier = ">=2.4.0" }, { name = "pytest-xdist", specifier = ">=3.6.1" }, { name = "pyupgrade" }, { name = "respx", specifier = ">=0.20.2" }, @@ -1814,6 +1816,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5a/cc/06253936f4a7fa2e0f48dfe6d851d9c56df896a9ab09ac019d70b760619c/pytest_mock-3.15.1-py3-none-any.whl", hash = "sha256:0a25e2eb88fe5168d535041d09a4529a188176ae608a6d249ee65abc0949630d", size = 10095, upload-time = "2025-09-16T16:37:25.734Z" }, ] +[[package]] +name = "pytest-timeout" +version = "2.4.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ac/82/4c9ecabab13363e72d880f2fb504c5f750433b2b6f16e99f4ec21ada284c/pytest_timeout-2.4.0.tar.gz", hash = "sha256:7e68e90b01f9eff71332b25001f85c75495fc4e3a836701876183c4bcfd0540a", size = 17973, upload-time = "2025-05-05T19:44:34.99Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fa/b6/3127540ecdf1464a00e5a01ee60a1b09175f6913f0644ac748494d9c4b21/pytest_timeout-2.4.0-py3-none-any.whl", hash = "sha256:c42667e5cdadb151aeb5b26d114aff6bdf5a907f176a007a30b940d3d865b5c2", size = 14382, upload-time = "2025-05-05T19:44:33.502Z" }, +] + [[package]] name = "pytest-xdist" version = "3.8.0"