Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 84 additions & 26 deletions astrbot/core/agent/mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,13 @@
import os
import re
import sys
import httpx

from contextlib import AsyncExitStack
from datetime import timedelta
from pathlib import Path, PureWindowsPath
from typing import Any, Generic

from tenacity import (
before_sleep_log,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from mcp.shared.exceptions import McpError

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The import of McpError from mcp.shared.exceptions is performed at the top level. However, mcp is an optional dependency in this codebase (as indicated by the conditional import block on lines 91-98). If mcp is not installed, importing mcp_client.py will fail immediately with a ModuleNotFoundError at startup.

Please remove this top-level import and instead import McpError dynamically or handle its import failure gracefully.


from astrbot import logger
from astrbot.core.agent.run_context import ContextWrapper
Expand Down Expand Up @@ -109,6 +104,47 @@
"Warning: Missing 'mcp' dependency or MCP library version too old, Streamable HTTP connection unavailable.",
)

# Transport/connection error types that should trigger automatic reconnection.
# Covers broken streams (anyio), HTTP errors like 404 (httpx), basic
# socket-level failures, and MCP protocol errors (e.g. Session terminated)
# so that a restarted MCP server can be recovered from.
_TRANSPORT_ERRORS: tuple[type[Exception], ...] = (
anyio.ClosedResourceError,
anyio.BrokenResourceError,
anyio.EndOfStream,
ConnectionError,
ConnectionResetError,
BrokenPipeError,
httpx.HTTPStatusError,
httpx.ConnectError,
httpx.ReadError,
httpx.RemoteProtocolError,
McpError,
)
Comment on lines +111 to +123

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Defining _TRANSPORT_ERRORS with anyio.ClosedResourceError and McpError at the module level will raise a NameError or ModuleNotFoundError if anyio or mcp are not installed (since they are optional dependencies imported conditionally).

To prevent this, we should construct _TRANSPORT_ERRORS dynamically by safely attempting to import these optional exceptions.

_transport_errors_list: list[type[Exception]] = [
    ConnectionError,
    ConnectionResetError,
    BrokenPipeError,
    httpx.HTTPStatusError,
    httpx.ConnectError,
    httpx.ReadError,
    httpx.RemoteProtocolError,
]

try:
    import anyio
    _transport_errors_list.extend([
        anyio.ClosedResourceError,
        anyio.BrokenResourceError,
        anyio.EndOfStream,
    ])
except ImportError:
    pass

try:
    from mcp.shared.exceptions import McpError
    _transport_errors_list.append(McpError)
except ImportError:
    pass

_TRANSPORT_ERRORS = tuple(_transport_errors_list)


# Keywords in exception messages that indicate a transport/connection problem.
_CONNECTION_ERROR_KEYWORDS = (
"session terminated",
"terminated",
"closed resource",
"broken resource",
"connection closed",
"connection reset",
)
Comment on lines +125 to +133

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): The generic keyword "terminated" may falsely classify unrelated errors as connection issues.

This broad substring match risks treating unrelated lifecycle or business errors as transport failures, triggering unnecessary reconnects. Consider restricting matching to more specific phrases (e.g., variants of the existing "session terminated") or otherwise tightening the condition so it only captures genuine connection/transport errors.

Suggested change
# Keywords in exception messages that indicate a transport/connection problem.
_CONNECTION_ERROR_KEYWORDS = (
"session terminated",
"terminated",
"closed resource",
"broken resource",
"connection closed",
"connection reset",
)
# Keywords in exception messages that indicate a transport/connection problem.
# Note: avoid overly generic substrings (e.g. just "terminated") to prevent
# misclassifying unrelated lifecycle or business errors as connection issues.
_CONNECTION_ERROR_KEYWORDS = (
"session terminated",
"session was terminated",
"closed resource",
"broken resource",
"connection closed",
"connection reset",
)



def _is_connection_error(exc: BaseException) -> bool:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Change the parameter type of _is_connection_error from BaseException to Exception to align with catching Exception.

def _is_connection_error(exc: Exception) -> bool:

"""Return True if *exc* looks like a transport / connection failure.

Checks both the exception hierarchy and the string representation so that
library-specific errors (e.g. MCP ``LifecycleError`` with "Session
terminated") are covered even if we cannot import them directly.
"""
if isinstance(exc, _TRANSPORT_ERRORS):
return True
msg = str(exc).lower()
return any(kw in msg for kw in _CONNECTION_ERROR_KEYWORDS)


def _prepare_config(config: dict) -> dict:
"""Prepare configuration, handle nested format"""
Expand Down Expand Up @@ -589,7 +625,10 @@ async def call_tool_with_reconnect(
arguments: dict,
read_timeout_seconds: timedelta,
) -> mcp.types.CallToolResult:
"""Call MCP tool with automatic reconnection on failure, max 2 retries.
"""Call MCP tool with automatic reconnection on failure.

Tries the call up to 3 times, reconnecting on transport / connection
errors (including library-specific errors like "Session terminated").

Args:
tool_name: tool name
Expand All @@ -601,36 +640,55 @@ async def call_tool_with_reconnect(

Raises:
ValueError: MCP session is not available
anyio.ClosedResourceError: raised after reconnection failure
Exception: raised after all retries are exhausted
"""

@retry(
retry=retry_if_exception_type(anyio.ClosedResourceError),
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=1, max=3),
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True,
)
async def _call_with_retry():
last_exc: BaseException | None = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Change the type annotation of last_exc from BaseException to Exception to align with catching Exception instead of BaseException.

        last_exc: Exception | None = None


for attempt in range(3):
if not self.session:
raise ValueError("MCP session is not available for MCP function tools.")
# Session was cleared (e.g. by a previous failed reconnect).
# Try to reconnect instead of immediately failing.
logger.warning(
f"MCP session is not available, attempting to reconnect... (attempt {attempt + 1}/3)"
)
try:
await self._reconnect()
except Exception as reconnect_err:
logger.error(f"Reconnection failed: {reconnect_err}")
if attempt == 2:
raise ValueError(
"MCP session is not available after reconnection attempts"
) from reconnect_err
continue

try:
return await self.session.call_tool(
name=tool_name,
arguments=arguments,
read_timeout_seconds=read_timeout_seconds,
)
except anyio.ClosedResourceError:
except BaseException as e:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching BaseException is generally discouraged because it intercepts system-initiating exceptions like KeyboardInterrupt, SystemExit, and asyncio.CancelledError. This can lead to unexpected behavior or prevent clean shutdowns/cancellations.

Since all connection and transport-related exceptions inherit from Exception, we should catch Exception instead.

            except Exception as e:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Catching BaseException may swallow cancellations and other critical signals; consider restricting to Exception.

This will also catch asyncio.CancelledError, KeyboardInterrupt, and SystemExit. In async code that can break task cancellation and shutdown, especially if this layer retries or suppresses the error. Limiting the handler to Exception and letting cancellations/termination signals propagate is safer:

except Exception as e:
    ...
Suggested change
except BaseException as e:
except Exception as e:

last_exc = e

if not _is_connection_error(e):
# Not a connection error – don't retry.
raise

logger.warning(
f"MCP tool {tool_name} call failed (ClosedResourceError), attempting to reconnect..."
f"MCP tool {tool_name} call failed ({type(e).__name__}: {e}), "
f"attempting to reconnect... (attempt {attempt + 1}/3)"
)
# Attempt to reconnect
await self._reconnect()
# Reraise the exception to trigger tenacity retry
raise

return await _call_with_retry()
try:
await self._reconnect()
except Exception as reconnect_err:
logger.error(f"Reconnection failed: {reconnect_err}")
if attempt == 2:
raise e from reconnect_err

# Should not reach here, but just in case
raise last_exc # type: ignore[misc]

async def cleanup(self) -> None:
"""Clean up resources including old exit stacks from reconnections"""
Expand Down