diff --git a/astrbot/core/agent/mcp_client.py b/astrbot/core/agent/mcp_client.py index b75999ea65..c3e3f9825a 100644 --- a/astrbot/core/agent/mcp_client.py +++ b/astrbot/core/agent/mcp_client.py @@ -4,18 +4,13 @@ import os import re import sys +import httpx + from contextlib import AsyncExitStack from datetime import timedelta from pathlib import Path, PureWindowsPath from typing import Any, Generic - -from tenacity import ( - before_sleep_log, - retry, - retry_if_exception_type, - stop_after_attempt, - wait_exponential, -) +from mcp.shared.exceptions import McpError from astrbot import logger from astrbot.core.agent.run_context import ContextWrapper @@ -109,6 +104,47 @@ "Warning: Missing 'mcp' dependency or MCP library version too old, Streamable HTTP connection unavailable.", ) +# Transport/connection error types that should trigger automatic reconnection. +# Covers broken streams (anyio), HTTP errors like 404 (httpx), basic +# socket-level failures, and MCP protocol errors (e.g. Session terminated) +# so that a restarted MCP server can be recovered from. +_TRANSPORT_ERRORS: tuple[type[Exception], ...] = ( + anyio.ClosedResourceError, + anyio.BrokenResourceError, + anyio.EndOfStream, + ConnectionError, + ConnectionResetError, + BrokenPipeError, + httpx.HTTPStatusError, + httpx.ConnectError, + httpx.ReadError, + httpx.RemoteProtocolError, + McpError, +) + +# Keywords in exception messages that indicate a transport/connection problem. +_CONNECTION_ERROR_KEYWORDS = ( + "session terminated", + "terminated", + "closed resource", + "broken resource", + "connection closed", + "connection reset", +) + + +def _is_connection_error(exc: BaseException) -> bool: + """Return True if *exc* looks like a transport / connection failure. + + Checks both the exception hierarchy and the string representation so that + library-specific errors (e.g. MCP ``LifecycleError`` with "Session + terminated") are covered even if we cannot import them directly. + """ + if isinstance(exc, _TRANSPORT_ERRORS): + return True + msg = str(exc).lower() + return any(kw in msg for kw in _CONNECTION_ERROR_KEYWORDS) + def _prepare_config(config: dict) -> dict: """Prepare configuration, handle nested format""" @@ -589,7 +625,10 @@ async def call_tool_with_reconnect( arguments: dict, read_timeout_seconds: timedelta, ) -> mcp.types.CallToolResult: - """Call MCP tool with automatic reconnection on failure, max 2 retries. + """Call MCP tool with automatic reconnection on failure. + + Tries the call up to 3 times, reconnecting on transport / connection + errors (including library-specific errors like "Session terminated"). Args: tool_name: tool name @@ -601,19 +640,27 @@ async def call_tool_with_reconnect( Raises: ValueError: MCP session is not available - anyio.ClosedResourceError: raised after reconnection failure + Exception: raised after all retries are exhausted """ - @retry( - retry=retry_if_exception_type(anyio.ClosedResourceError), - stop=stop_after_attempt(2), - wait=wait_exponential(multiplier=1, min=1, max=3), - before_sleep=before_sleep_log(logger, logging.WARNING), - reraise=True, - ) - async def _call_with_retry(): + last_exc: BaseException | None = None + + for attempt in range(3): if not self.session: - raise ValueError("MCP session is not available for MCP function tools.") + # Session was cleared (e.g. by a previous failed reconnect). + # Try to reconnect instead of immediately failing. + logger.warning( + f"MCP session is not available, attempting to reconnect... (attempt {attempt + 1}/3)" + ) + try: + await self._reconnect() + except Exception as reconnect_err: + logger.error(f"Reconnection failed: {reconnect_err}") + if attempt == 2: + raise ValueError( + "MCP session is not available after reconnection attempts" + ) from reconnect_err + continue try: return await self.session.call_tool( @@ -621,16 +668,27 @@ async def _call_with_retry(): arguments=arguments, read_timeout_seconds=read_timeout_seconds, ) - except anyio.ClosedResourceError: + except BaseException as e: + last_exc = e + + if not _is_connection_error(e): + # Not a connection error – don't retry. + raise + logger.warning( - f"MCP tool {tool_name} call failed (ClosedResourceError), attempting to reconnect..." + f"MCP tool {tool_name} call failed ({type(e).__name__}: {e}), " + f"attempting to reconnect... (attempt {attempt + 1}/3)" ) - # Attempt to reconnect - await self._reconnect() - # Reraise the exception to trigger tenacity retry - raise - return await _call_with_retry() + try: + await self._reconnect() + except Exception as reconnect_err: + logger.error(f"Reconnection failed: {reconnect_err}") + if attempt == 2: + raise e from reconnect_err + + # Should not reach here, but just in case + raise last_exc # type: ignore[misc] async def cleanup(self) -> None: """Clean up resources including old exit stacks from reconnections"""