Skip to content

Commit 88621c5

Browse files
simonrosenbergDebug AgentclaudePsiACE
authored
fix: reject pending requests on EOF to prevent infinite hang (#86)
* fix: reject pending requests on EOF to prevent infinite hang When the remote end closes the connection (e.g., subprocess crashes), _receive_loop exits cleanly on EOF without raising an exception. This means _on_receive_error is never called and pending outgoing request futures hang forever. Add reject_all_outgoing() after the receive loop breaks on EOF so callers get a ConnectionError instead of an infinite hang. Fixes #85 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: fail fast after connection EOF --------- Co-authored-by: Debug Agent <debug@example.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Chojan Shang <psiace@apache.org>
1 parent d53722c commit 88621c5

2 files changed

Lines changed: 45 additions & 1 deletion

File tree

src/acp/connection.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def __init__(
8383
self._tasks.add_error_handler(self._on_task_error)
8484
self._queue = queue or InMemoryMessageQueue()
8585
self._closed = False
86+
self._disconnected = False
8687
self._sender = (sender_factory or self._default_sender_factory)(self._writer, self._tasks)
8788
if listening:
8889
self._recv_task = self._tasks.create(
@@ -132,6 +133,7 @@ def add_observer(self, observer: StreamObserver) -> None:
132133
self._observers.append(observer)
133134

134135
async def send_request(self, method: str, params: JsonValue | None = None) -> Any:
136+
self._raise_if_unavailable()
135137
request_id = self._next_request_id
136138
self._next_request_id += 1
137139
future = self._state.register_outgoing(request_id, method)
@@ -141,6 +143,7 @@ async def send_request(self, method: str, params: JsonValue | None = None) -> An
141143
return await future
142144

143145
async def send_notification(self, method: str, params: JsonValue | None = None) -> None:
146+
self._raise_if_unavailable()
144147
payload = {"jsonrpc": "2.0", "method": method, "params": params}
145148
await self._sender.send(payload)
146149
self._notify_observers(StreamDirection.OUTGOING, payload)
@@ -160,6 +163,7 @@ async def _receive_loop(self) -> None:
160163
await self._process_message(message)
161164
except asyncio.CancelledError:
162165
return
166+
self._disconnect()
163167

164168
async def _process_message(self, message: dict[str, Any]) -> None:
165169
method = message.get("method")
@@ -262,7 +266,7 @@ async def _handle_response(self, message: dict[str, Any]) -> None:
262266

263267
def _on_receive_error(self, task: asyncio.Task[Any], exc: BaseException) -> None:
264268
logging.exception("Receive loop failed", exc_info=exc)
265-
self._state.reject_all_outgoing(exc)
269+
self._disconnect()
266270

267271
def _on_task_error(self, task: asyncio.Task[Any], exc: BaseException) -> None:
268272
logging.exception("Background task failed", exc_info=exc)
@@ -285,3 +289,13 @@ def _default_dispatcher_factory(
285289

286290
def _default_sender_factory(self, writer: asyncio.StreamWriter, supervisor: TaskSupervisor) -> MessageSender:
287291
return MessageSender(writer, supervisor)
292+
293+
def _disconnect(self) -> None:
294+
if self._disconnected:
295+
return
296+
self._disconnected = True
297+
self._state.reject_all_outgoing(ConnectionError("Connection closed"))
298+
299+
def _raise_if_unavailable(self) -> None:
300+
if self._disconnected or self._closed:
301+
raise ConnectionError("Connection closed")

tests/test_rpc.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
update_agent_message_text,
2727
update_tool_call,
2828
)
29+
from acp.connection import Connection
2930
from acp.core import AgentSideConnection, ClientSideConnection
3031
from acp.schema import (
3132
AgentMessageChunk,
@@ -199,6 +200,35 @@ async def read_one(i: int):
199200
assert res.content == f"Content {i}"
200201

201202

203+
@pytest.mark.asyncio
204+
async def test_pending_request_fails_when_remote_sends_eof(server):
205+
conn = Connection(lambda method, params, is_notification: None, server.client_writer, server.client_reader)
206+
request = asyncio.create_task(conn.send_request("ping", {"value": 1}))
207+
208+
await asyncio.sleep(0.05)
209+
server.server_writer.close()
210+
await server.server_writer.wait_closed()
211+
212+
with pytest.raises(ConnectionError, match="Connection closed"):
213+
await asyncio.wait_for(request, timeout=1.0)
214+
215+
await conn.close()
216+
217+
218+
@pytest.mark.asyncio
219+
async def test_new_requests_fail_fast_after_remote_eof(server):
220+
conn = Connection(lambda method, params, is_notification: None, server.client_writer, server.client_reader)
221+
222+
server.server_writer.close()
223+
await server.server_writer.wait_closed()
224+
await asyncio.sleep(0.05)
225+
226+
with pytest.raises(ConnectionError, match="Connection closed"):
227+
await asyncio.wait_for(conn.send_request("ping", {"value": 1}), timeout=1.0)
228+
229+
await conn.close()
230+
231+
202232
@pytest.mark.asyncio
203233
async def test_invalid_params_results_in_error_response(connect, server):
204234
# Only start agent-side (server) so we can inject raw request from client socket

0 commit comments

Comments
 (0)