-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
fix: MCP client auto reconnect when MCP session is not available #8682
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -4,18 +4,13 @@ | |||||||||||||||||||||||||||||||||||||||||
| import os | ||||||||||||||||||||||||||||||||||||||||||
| import re | ||||||||||||||||||||||||||||||||||||||||||
| import sys | ||||||||||||||||||||||||||||||||||||||||||
| import httpx | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| from contextlib import AsyncExitStack | ||||||||||||||||||||||||||||||||||||||||||
| from datetime import timedelta | ||||||||||||||||||||||||||||||||||||||||||
| from pathlib import Path, PureWindowsPath | ||||||||||||||||||||||||||||||||||||||||||
| from typing import Any, Generic | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| from tenacity import ( | ||||||||||||||||||||||||||||||||||||||||||
| before_sleep_log, | ||||||||||||||||||||||||||||||||||||||||||
| retry, | ||||||||||||||||||||||||||||||||||||||||||
| retry_if_exception_type, | ||||||||||||||||||||||||||||||||||||||||||
| stop_after_attempt, | ||||||||||||||||||||||||||||||||||||||||||
| wait_exponential, | ||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||
| from mcp.shared.exceptions import McpError | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| from astrbot import logger | ||||||||||||||||||||||||||||||||||||||||||
| from astrbot.core.agent.run_context import ContextWrapper | ||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -109,6 +104,47 @@ | |||||||||||||||||||||||||||||||||||||||||
| "Warning: Missing 'mcp' dependency or MCP library version too old, Streamable HTTP connection unavailable.", | ||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| # Transport/connection error types that should trigger automatic reconnection. | ||||||||||||||||||||||||||||||||||||||||||
| # Covers broken streams (anyio), HTTP errors like 404 (httpx), basic | ||||||||||||||||||||||||||||||||||||||||||
| # socket-level failures, and MCP protocol errors (e.g. Session terminated) | ||||||||||||||||||||||||||||||||||||||||||
| # so that a restarted MCP server can be recovered from. | ||||||||||||||||||||||||||||||||||||||||||
| _TRANSPORT_ERRORS: tuple[type[Exception], ...] = ( | ||||||||||||||||||||||||||||||||||||||||||
| anyio.ClosedResourceError, | ||||||||||||||||||||||||||||||||||||||||||
| anyio.BrokenResourceError, | ||||||||||||||||||||||||||||||||||||||||||
| anyio.EndOfStream, | ||||||||||||||||||||||||||||||||||||||||||
| ConnectionError, | ||||||||||||||||||||||||||||||||||||||||||
| ConnectionResetError, | ||||||||||||||||||||||||||||||||||||||||||
| BrokenPipeError, | ||||||||||||||||||||||||||||||||||||||||||
| httpx.HTTPStatusError, | ||||||||||||||||||||||||||||||||||||||||||
| httpx.ConnectError, | ||||||||||||||||||||||||||||||||||||||||||
| httpx.ReadError, | ||||||||||||||||||||||||||||||||||||||||||
| httpx.RemoteProtocolError, | ||||||||||||||||||||||||||||||||||||||||||
| McpError, | ||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+111
to
+123
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Defining To prevent this, we should construct _transport_errors_list: list[type[Exception]] = [
ConnectionError,
ConnectionResetError,
BrokenPipeError,
httpx.HTTPStatusError,
httpx.ConnectError,
httpx.ReadError,
httpx.RemoteProtocolError,
]
try:
import anyio
_transport_errors_list.extend([
anyio.ClosedResourceError,
anyio.BrokenResourceError,
anyio.EndOfStream,
])
except ImportError:
pass
try:
from mcp.shared.exceptions import McpError
_transport_errors_list.append(McpError)
except ImportError:
pass
_TRANSPORT_ERRORS = tuple(_transport_errors_list) |
||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| # Keywords in exception messages that indicate a transport/connection problem. | ||||||||||||||||||||||||||||||||||||||||||
| _CONNECTION_ERROR_KEYWORDS = ( | ||||||||||||||||||||||||||||||||||||||||||
| "session terminated", | ||||||||||||||||||||||||||||||||||||||||||
| "terminated", | ||||||||||||||||||||||||||||||||||||||||||
| "closed resource", | ||||||||||||||||||||||||||||||||||||||||||
| "broken resource", | ||||||||||||||||||||||||||||||||||||||||||
| "connection closed", | ||||||||||||||||||||||||||||||||||||||||||
| "connection reset", | ||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+125
to
+133
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion (bug_risk): The generic keyword "terminated" may falsely classify unrelated errors as connection issues. This broad substring match risks treating unrelated lifecycle or business errors as transport failures, triggering unnecessary reconnects. Consider restricting matching to more specific phrases (e.g., variants of the existing "session terminated") or otherwise tightening the condition so it only captures genuine connection/transport errors.
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| def _is_connection_error(exc: BaseException) -> bool: | ||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||||||||||||||||||||||||
| """Return True if *exc* looks like a transport / connection failure. | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| Checks both the exception hierarchy and the string representation so that | ||||||||||||||||||||||||||||||||||||||||||
| library-specific errors (e.g. MCP ``LifecycleError`` with "Session | ||||||||||||||||||||||||||||||||||||||||||
| terminated") are covered even if we cannot import them directly. | ||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||
| if isinstance(exc, _TRANSPORT_ERRORS): | ||||||||||||||||||||||||||||||||||||||||||
| return True | ||||||||||||||||||||||||||||||||||||||||||
| msg = str(exc).lower() | ||||||||||||||||||||||||||||||||||||||||||
| return any(kw in msg for kw in _CONNECTION_ERROR_KEYWORDS) | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| def _prepare_config(config: dict) -> dict: | ||||||||||||||||||||||||||||||||||||||||||
| """Prepare configuration, handle nested format""" | ||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -589,7 +625,10 @@ async def call_tool_with_reconnect( | |||||||||||||||||||||||||||||||||||||||||
| arguments: dict, | ||||||||||||||||||||||||||||||||||||||||||
| read_timeout_seconds: timedelta, | ||||||||||||||||||||||||||||||||||||||||||
| ) -> mcp.types.CallToolResult: | ||||||||||||||||||||||||||||||||||||||||||
| """Call MCP tool with automatic reconnection on failure, max 2 retries. | ||||||||||||||||||||||||||||||||||||||||||
| """Call MCP tool with automatic reconnection on failure. | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| Tries the call up to 3 times, reconnecting on transport / connection | ||||||||||||||||||||||||||||||||||||||||||
| errors (including library-specific errors like "Session terminated"). | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||||||||||
| tool_name: tool name | ||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -601,36 +640,55 @@ async def call_tool_with_reconnect( | |||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| Raises: | ||||||||||||||||||||||||||||||||||||||||||
| ValueError: MCP session is not available | ||||||||||||||||||||||||||||||||||||||||||
| anyio.ClosedResourceError: raised after reconnection failure | ||||||||||||||||||||||||||||||||||||||||||
| Exception: raised after all retries are exhausted | ||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| @retry( | ||||||||||||||||||||||||||||||||||||||||||
| retry=retry_if_exception_type(anyio.ClosedResourceError), | ||||||||||||||||||||||||||||||||||||||||||
| stop=stop_after_attempt(2), | ||||||||||||||||||||||||||||||||||||||||||
| wait=wait_exponential(multiplier=1, min=1, max=3), | ||||||||||||||||||||||||||||||||||||||||||
| before_sleep=before_sleep_log(logger, logging.WARNING), | ||||||||||||||||||||||||||||||||||||||||||
| reraise=True, | ||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||
| async def _call_with_retry(): | ||||||||||||||||||||||||||||||||||||||||||
| last_exc: BaseException | None = None | ||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| for attempt in range(3): | ||||||||||||||||||||||||||||||||||||||||||
| if not self.session: | ||||||||||||||||||||||||||||||||||||||||||
| raise ValueError("MCP session is not available for MCP function tools.") | ||||||||||||||||||||||||||||||||||||||||||
| # Session was cleared (e.g. by a previous failed reconnect). | ||||||||||||||||||||||||||||||||||||||||||
| # Try to reconnect instead of immediately failing. | ||||||||||||||||||||||||||||||||||||||||||
| logger.warning( | ||||||||||||||||||||||||||||||||||||||||||
| f"MCP session is not available, attempting to reconnect... (attempt {attempt + 1}/3)" | ||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||
| await self._reconnect() | ||||||||||||||||||||||||||||||||||||||||||
| except Exception as reconnect_err: | ||||||||||||||||||||||||||||||||||||||||||
| logger.error(f"Reconnection failed: {reconnect_err}") | ||||||||||||||||||||||||||||||||||||||||||
| if attempt == 2: | ||||||||||||||||||||||||||||||||||||||||||
| raise ValueError( | ||||||||||||||||||||||||||||||||||||||||||
| "MCP session is not available after reconnection attempts" | ||||||||||||||||||||||||||||||||||||||||||
| ) from reconnect_err | ||||||||||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||
| return await self.session.call_tool( | ||||||||||||||||||||||||||||||||||||||||||
| name=tool_name, | ||||||||||||||||||||||||||||||||||||||||||
| arguments=arguments, | ||||||||||||||||||||||||||||||||||||||||||
| read_timeout_seconds=read_timeout_seconds, | ||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||
| except anyio.ClosedResourceError: | ||||||||||||||||||||||||||||||||||||||||||
| except BaseException as e: | ||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Catching Since all connection and transport-related exceptions inherit from except Exception as e:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion (bug_risk): Catching This will also catch except Exception as e:
...
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||
| last_exc = e | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| if not _is_connection_error(e): | ||||||||||||||||||||||||||||||||||||||||||
| # Not a connection error – don't retry. | ||||||||||||||||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| logger.warning( | ||||||||||||||||||||||||||||||||||||||||||
| f"MCP tool {tool_name} call failed (ClosedResourceError), attempting to reconnect..." | ||||||||||||||||||||||||||||||||||||||||||
| f"MCP tool {tool_name} call failed ({type(e).__name__}: {e}), " | ||||||||||||||||||||||||||||||||||||||||||
| f"attempting to reconnect... (attempt {attempt + 1}/3)" | ||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||
| # Attempt to reconnect | ||||||||||||||||||||||||||||||||||||||||||
| await self._reconnect() | ||||||||||||||||||||||||||||||||||||||||||
| # Reraise the exception to trigger tenacity retry | ||||||||||||||||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| return await _call_with_retry() | ||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||
| await self._reconnect() | ||||||||||||||||||||||||||||||||||||||||||
| except Exception as reconnect_err: | ||||||||||||||||||||||||||||||||||||||||||
| logger.error(f"Reconnection failed: {reconnect_err}") | ||||||||||||||||||||||||||||||||||||||||||
| if attempt == 2: | ||||||||||||||||||||||||||||||||||||||||||
| raise e from reconnect_err | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| # Should not reach here, but just in case | ||||||||||||||||||||||||||||||||||||||||||
| raise last_exc # type: ignore[misc] | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
| async def cleanup(self) -> None: | ||||||||||||||||||||||||||||||||||||||||||
| """Clean up resources including old exit stacks from reconnections""" | ||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import of
McpErrorfrommcp.shared.exceptionsis performed at the top level. However,mcpis an optional dependency in this codebase (as indicated by the conditional import block on lines 91-98). Ifmcpis not installed, importingmcp_client.pywill fail immediately with aModuleNotFoundErrorat startup.Please remove this top-level import and instead import
McpErrordynamically or handle its import failure gracefully.