Skip to content

support acp and a2a#904

Open
alcholiclg wants to merge 1 commit intomodelscope:mainfrom
alcholiclg:feat/acp_support
Open

support acp and a2a#904
alcholiclg wants to merge 1 commit intomodelscope:mainfrom
alcholiclg:feat/acp_support

Conversation

@alcholiclg
Copy link
Copy Markdown
Collaborator

Change Summary

  1. support for acp
  2. support for a2a
  3. add tests

Related issue number

Checklist

  • The pull request title is a good summary of the changes - it will be used in the changelog
  • Unit tests for the changes exist
  • Run pre-commit install and pre-commit run --all-files before git commit, and passed lint check.
  • Documentation reflects the changes where applicable

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces A2A (Agent-to-Agent) and ACP (Agent Client Protocol) support, including a proxy server for dispatching sessions to backend agents. My review identified several critical issues: the use of non-async-safe global stdout redirection in MSAgentA2AExecutor and MSAgentACPServer which will cause process-wide crashes under concurrency, resource leaks in A2AClientManager due to unclosed HTTP clients, and potential memory leaks in _DummyConn. Additionally, I recommended moving optional dependency imports inside tool_manager.py to prevent runtime failures and suggested using secrets.compare_digest for secure API key validation.

Comment thread ms_agent/acp/server.py
Comment on lines +76 to +90
def _suppress_stdout():
"""Redirect stdout to devnull while running agent logic.

LLMAgent.step() writes streaming tokens to sys.stdout, which
would corrupt the ACP JSON-RPC wire when running over stdio.
"""
real_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
try:
yield
finally:
sys.stdout.close()
sys.stdout = real_stdout

async def initialize(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This context manager is not async-safe. Since sys.stdout is a process-wide global, concurrent tasks will interfere with each other. Specifically, if Task A and Task B both enter this block, Task B will capture Task A's devnull as its real_stdout. When Task A finishes and closes its devnull, Task B will eventually restore sys.stdout to a closed file handle, causing all subsequent output operations in the process to crash with a ValueError. A better approach is to configure the agent to use a specific stream or quiet mode instead of monkeypatching globals.

Comment thread ms_agent/a2a/executor.py
Comment on lines +64 to +77
@contextmanager
def _suppress_stdout():
"""Redirect stdout to devnull while running agent logic.

``LLMAgent.step()`` writes streaming tokens to ``sys.stdout``,
which would corrupt any stdio-based transport.
"""
real_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
try:
yield
finally:
sys.stdout.close()
sys.stdout = real_stdout
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This context manager is not async-safe and will cause a process-wide crash under concurrent load. See the detailed explanation in the similar finding for ms_agent/acp/server.py.


import json
from ms_agent.llm.utils import Tool, ToolCall
from ms_agent.tools.acp_agent_tool import ACPAgentTool
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This top-level import will cause an ImportError if the optional agent-client-protocol dependency is not installed, breaking the entire ms-agent tool manager. This import should be moved inside the __init__ method and wrapped in a try-except block, similar to how A2AAgentTool is handled.

Comment on lines +92 to +94
acp_tool = ACPAgentTool.from_config(config)
if acp_tool is not None:
self.extra_tools.append(acp_tool)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Wrap the ACPAgentTool initialization in a try-except block to handle cases where the optional dependency is missing, and move the import here to avoid top-level failures.

        try:
            from ms_agent.tools.acp_agent_tool import ACPAgentTool
            acp_tool = ACPAgentTool.from_config(config)
            if acp_tool is not None:
                self.extra_tools.append(acp_tool)
        except ImportError:
            pass

Comment thread ms_agent/a2a/client.py
Comment on lines +58 to +59
http_client = httpx.AsyncClient(
timeout=300.0, headers=auth_headers)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

A new httpx.AsyncClient is created here but never closed, which will lead to resource leaks (file descriptors and sockets). Additionally, creating a new client for every request is inefficient. Consider using a context manager or reusing the existing self._http_client by passing headers to the request methods if the SDK allows it.

if not authorization:
raise HTTPException(401, 'Authorization header required')
parts = authorization.split()
if len(parts) != 2 or parts[0].lower() != 'bearer' or parts[1] != _api_key:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The API key comparison uses a standard inequality operator, which is vulnerable to timing attacks. Use secrets.compare_digest for constant-time comparison of sensitive tokens. Note that you will need to add import secrets to the file.

Suggested change
if len(parts) != 2 or parts[0].lower() != 'bearer' or parts[1] != _api_key:
if len(parts) != 2 or parts[0].lower() != 'bearer' or not secrets.compare_digest(parts[1], _api_key):

server = None

def __init__(self):
self._queues: dict[str, asyncio.Queue] = {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _queues dictionary in _DummyConn grows indefinitely as new sessions are created, leading to a memory leak. There is no mechanism to remove queues when a session is closed or evicted from the ACPSessionStore.

Comment thread ms_agent/acp/proxy.py
Comment on lines +312 to +313
self.session_store._sessions.pop(old_id, None)
self.session_store._sessions[proxy_sid] = entry
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Directly accessing and modifying the private _sessions attribute of ProxySessionStore violates encapsulation. The register method should be updated to accept a custom session_id or return the generated one for use, avoiding the need to manually pop and re-insert entries.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant