Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces A2A (Agent-to-Agent) and ACP (Agent Client Protocol) support, including a proxy server for dispatching sessions to backend agents. My review identified several critical issues: the use of non-async-safe global stdout redirection in MSAgentA2AExecutor and MSAgentACPServer which will cause process-wide crashes under concurrency, resource leaks in A2AClientManager due to unclosed HTTP clients, and potential memory leaks in _DummyConn. Additionally, I recommended moving optional dependency imports inside tool_manager.py to prevent runtime failures and suggested using secrets.compare_digest for secure API key validation.
| def _suppress_stdout(): | ||
| """Redirect stdout to devnull while running agent logic. | ||
|
|
||
| LLMAgent.step() writes streaming tokens to sys.stdout, which | ||
| would corrupt the ACP JSON-RPC wire when running over stdio. | ||
| """ | ||
| real_stdout = sys.stdout | ||
| sys.stdout = open(os.devnull, 'w') | ||
| try: | ||
| yield | ||
| finally: | ||
| sys.stdout.close() | ||
| sys.stdout = real_stdout | ||
|
|
||
| async def initialize( |
There was a problem hiding this comment.
This context manager is not async-safe. Since sys.stdout is a process-wide global, concurrent tasks will interfere with each other. Specifically, if Task A and Task B both enter this block, Task B will capture Task A's devnull as its real_stdout. When Task A finishes and closes its devnull, Task B will eventually restore sys.stdout to a closed file handle, causing all subsequent output operations in the process to crash with a ValueError. A better approach is to configure the agent to use a specific stream or quiet mode instead of monkeypatching globals.
| @contextmanager | ||
| def _suppress_stdout(): | ||
| """Redirect stdout to devnull while running agent logic. | ||
|
|
||
| ``LLMAgent.step()`` writes streaming tokens to ``sys.stdout``, | ||
| which would corrupt any stdio-based transport. | ||
| """ | ||
| real_stdout = sys.stdout | ||
| sys.stdout = open(os.devnull, 'w') | ||
| try: | ||
| yield | ||
| finally: | ||
| sys.stdout.close() | ||
| sys.stdout = real_stdout |
|
|
||
| import json | ||
| from ms_agent.llm.utils import Tool, ToolCall | ||
| from ms_agent.tools.acp_agent_tool import ACPAgentTool |
There was a problem hiding this comment.
| acp_tool = ACPAgentTool.from_config(config) | ||
| if acp_tool is not None: | ||
| self.extra_tools.append(acp_tool) |
There was a problem hiding this comment.
Wrap the ACPAgentTool initialization in a try-except block to handle cases where the optional dependency is missing, and move the import here to avoid top-level failures.
try:
from ms_agent.tools.acp_agent_tool import ACPAgentTool
acp_tool = ACPAgentTool.from_config(config)
if acp_tool is not None:
self.extra_tools.append(acp_tool)
except ImportError:
pass| http_client = httpx.AsyncClient( | ||
| timeout=300.0, headers=auth_headers) |
There was a problem hiding this comment.
A new httpx.AsyncClient is created here but never closed, which will lead to resource leaks (file descriptors and sockets). Additionally, creating a new client for every request is inefficient. Consider using a context manager or reusing the existing self._http_client by passing headers to the request methods if the SDK allows it.
| if not authorization: | ||
| raise HTTPException(401, 'Authorization header required') | ||
| parts = authorization.split() | ||
| if len(parts) != 2 or parts[0].lower() != 'bearer' or parts[1] != _api_key: |
There was a problem hiding this comment.
The API key comparison uses a standard inequality operator, which is vulnerable to timing attacks. Use secrets.compare_digest for constant-time comparison of sensitive tokens. Note that you will need to add import secrets to the file.
| if len(parts) != 2 or parts[0].lower() != 'bearer' or parts[1] != _api_key: | |
| if len(parts) != 2 or parts[0].lower() != 'bearer' or not secrets.compare_digest(parts[1], _api_key): |
| server = None | ||
|
|
||
| def __init__(self): | ||
| self._queues: dict[str, asyncio.Queue] = {} |
| self.session_store._sessions.pop(old_id, None) | ||
| self.session_store._sessions[proxy_sid] = entry |
There was a problem hiding this comment.
Change Summary
Related issue number
Checklist
pre-commit installandpre-commit run --all-filesbefore git commit, and passed lint check.