Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 134 additions & 72 deletions python/packages/core/agent_framework/_mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
from mcp.shared.context import RequestContext
from mcp.shared.exceptions import McpError
from mcp.shared.session import RequestResponder
from opentelemetry import propagate
from opentelemetry import propagate, trace

from ._tools import FunctionTool
from ._types import (
Content,
Message,
)
from .exceptions import ToolException, ToolExecutionException
from .observability import OtelAttr, get_mcp_call_span

if sys.version_info >= (3, 11):
from typing import Self # pragma: no cover
Expand Down Expand Up @@ -487,6 +488,7 @@ def __init__(
self.is_connected: bool = False
self._tools_loaded: bool = False
self._prompts_loaded: bool = False
self._mcp_protocol_version: str | None = None

def __str__(self) -> str:
return f"MCPTool(name={self.name}, description={self.description})"
Expand Down Expand Up @@ -590,7 +592,8 @@ async def connect(self, *, reset: bool = False) -> None:
inner_exception=ex,
) from ex
try:
await session.initialize()
init_result = await session.initialize()
self._mcp_protocol_version = init_result.protocolVersion
except Exception as ex:
await self._safe_close_exit_stack()
# Provide context about initialization failure
Expand All @@ -605,7 +608,8 @@ async def connect(self, *, reset: bool = False) -> None:
self.session = session
elif self.session._request_id == 0: # type: ignore[reportPrivateUsage]
# If the session is not initialized, we need to reinitialize it
await self.session.initialize()
init_result = await self.session.initialize()
self._mcp_protocol_version = init_result.protocolVersion
logger.debug("Connected to MCP server: %s", self.session)
self.is_connected = True
if self.load_tools_flag:
Expand Down Expand Up @@ -927,50 +931,83 @@ async def call_tool(self, tool_name: str, **kwargs: Any) -> str | list[Content]:
}
}

# Inject OpenTelemetry trace context into MCP _meta for distributed tracing.
otel_meta = _inject_otel_into_mcp_meta()

parser = self.parse_tool_results or _parse_tool_result_from_mcp

# Try the operation, reconnecting once if the connection is closed
for attempt in range(2):
try:
result = await self.session.call_tool(tool_name, arguments=filtered_kwargs, meta=otel_meta) # type: ignore
if result.isError:
parsed = parser(result)
text = (
"\n".join(c.text for c in parsed if c.type == "text" and c.text)
if isinstance(parsed, list)
else str(parsed)
)
raise ToolExecutionException(text or str(parsed))
return parser(result)
except ToolExecutionException:
raise
except ClosedResourceError as cl_ex:
if attempt == 0:
# First attempt failed, try reconnecting
logger.info("MCP connection closed unexpectedly. Reconnecting...")
try:
await self.connect(reset=True)
continue # Retry the operation
except Exception as reconn_ex:
span_attributes: dict[str, Any] = {
OtelAttr.MCP_METHOD_NAME: "tools/call",
OtelAttr.TOOL_NAME: tool_name,
OtelAttr.OPERATION: OtelAttr.TOOL_EXECUTION_OPERATION,
OtelAttr.JSONRPC_PROTOCOL_VERSION: "2.0",
}
if self._mcp_protocol_version:
span_attributes[OtelAttr.MCP_PROTOCOL_VERSION] = self._mcp_protocol_version

with get_mcp_call_span(span_attributes) as span:
# Try the operation, reconnecting once if the connection is closed
for attempt in range(2):
try:
# Capture the JSON-RPC request ID before the call is made.
# The MCP SDK stores the next request ID in the private `_request_id`
# attribute; no public API is available. We use getattr with a default
# so this degrades gracefully if the attribute is renamed in a future
# version of the library.
request_id = getattr(self.session, "_request_id", None) # type: ignore[union-attr]
if request_id is not None:
span.set_attribute(OtelAttr.JSONRPC_REQUEST_ID, str(request_id))

# Inject OpenTelemetry trace context into MCP _meta for distributed tracing.
otel_meta = _inject_otel_into_mcp_meta()

result = await self.session.call_tool(tool_name, arguments=filtered_kwargs, meta=otel_meta) # type: ignore
if result.isError:
parsed = parser(result)
text = (
"\n".join(c.text for c in parsed if c.type == "text" and c.text)
if isinstance(parsed, list)
else str(parsed)
)
error_msg = text or str(parsed)
span.set_attribute(OtelAttr.ERROR_TYPE, "ToolError")
span.set_status(trace.StatusCode.ERROR, error_msg)
raise ToolExecutionException(error_msg)
return parser(result)
except ToolExecutionException:
raise
except ClosedResourceError as cl_ex:
if attempt == 0:
# First attempt failed, try reconnecting
logger.info("MCP connection closed unexpectedly. Reconnecting...")
try:
await self.connect(reset=True)
continue # Retry the operation
except Exception as reconn_ex:
error_type = type(reconn_ex).__name__
span.set_attribute(OtelAttr.ERROR_TYPE, error_type)
span.set_status(trace.StatusCode.ERROR, str(reconn_ex))
raise ToolExecutionException(
"Failed to reconnect to MCP server.",
inner_exception=reconn_ex,
) from reconn_ex
else:
# Second attempt also failed, give up
logger.error(f"MCP connection closed unexpectedly after reconnection: {cl_ex}")
span.set_attribute(OtelAttr.ERROR_TYPE, type(cl_ex).__name__)
span.set_status(trace.StatusCode.ERROR, str(cl_ex))
raise ToolExecutionException(
"Failed to reconnect to MCP server.",
inner_exception=reconn_ex,
) from reconn_ex
else:
# Second attempt also failed, give up
logger.error(f"MCP connection closed unexpectedly after reconnection: {cl_ex}")
raise ToolExecutionException(
f"Failed to call tool '{tool_name}' - connection lost.",
inner_exception=cl_ex,
) from cl_ex
except McpError as mcp_exc:
raise ToolExecutionException(mcp_exc.error.message, inner_exception=mcp_exc) from mcp_exc
except Exception as ex:
raise ToolExecutionException(f"Failed to call tool '{tool_name}'.", inner_exception=ex) from ex
raise ToolExecutionException(f"Failed to call tool '{tool_name}' after retries.")
f"Failed to call tool '{tool_name}' - connection lost.",
inner_exception=cl_ex,
) from cl_ex
except McpError as mcp_exc:
span.set_attribute(OtelAttr.ERROR_TYPE, str(mcp_exc.error.code))
span.set_status(trace.StatusCode.ERROR, mcp_exc.error.message)
raise ToolExecutionException(mcp_exc.error.message, inner_exception=mcp_exc) from mcp_exc
except Exception as ex:
span.set_attribute(OtelAttr.ERROR_TYPE, type(ex).__name__)
span.set_status(trace.StatusCode.ERROR, str(ex))
raise ToolExecutionException(f"Failed to call tool '{tool_name}'.", inner_exception=ex) from ex
span.set_attribute(OtelAttr.ERROR_TYPE, "RetryExhausted")
span.set_status(trace.StatusCode.ERROR, f"Failed to call tool '{tool_name}' after retries.")
raise ToolExecutionException(f"Failed to call tool '{tool_name}' after retries.")

async def get_prompt(self, prompt_name: str, **kwargs: Any) -> str:
"""Call a prompt with the given arguments.
Expand All @@ -995,35 +1032,60 @@ async def get_prompt(self, prompt_name: str, **kwargs: Any) -> str:

parser = self.parse_prompt_results or _parse_prompt_result_from_mcp

# Try the operation, reconnecting once if the connection is closed
for attempt in range(2):
try:
prompt_result = await self.session.get_prompt(prompt_name, arguments=kwargs) # type: ignore
return parser(prompt_result)
except ClosedResourceError as cl_ex:
if attempt == 0:
# First attempt failed, try reconnecting
logger.info("MCP connection closed unexpectedly. Reconnecting...")
try:
await self.connect(reset=True)
continue # Retry the operation
except Exception as reconn_ex:
span_attributes: dict[str, Any] = {
OtelAttr.MCP_METHOD_NAME: "prompts/get",
OtelAttr.PROMPT_NAME: prompt_name,
OtelAttr.JSONRPC_PROTOCOL_VERSION: "2.0",
}
if self._mcp_protocol_version:
span_attributes[OtelAttr.MCP_PROTOCOL_VERSION] = self._mcp_protocol_version

with get_mcp_call_span(span_attributes) as span:
# Try the operation, reconnecting once if the connection is closed
for attempt in range(2):
try:
# Capture the JSON-RPC request ID before the call is made.
# See call_tool for rationale on using getattr with a default.
request_id = getattr(self.session, "_request_id", None) # type: ignore[union-attr]
if request_id is not None:
span.set_attribute(OtelAttr.JSONRPC_REQUEST_ID, str(request_id))

prompt_result = await self.session.get_prompt(prompt_name, arguments=kwargs) # type: ignore
return parser(prompt_result)
except ClosedResourceError as cl_ex:
if attempt == 0:
# First attempt failed, try reconnecting
logger.info("MCP connection closed unexpectedly. Reconnecting...")
try:
await self.connect(reset=True)
continue # Retry the operation
except Exception as reconn_ex:
span.set_attribute(OtelAttr.ERROR_TYPE, type(reconn_ex).__name__)
span.set_status(trace.StatusCode.ERROR, str(reconn_ex))
raise ToolExecutionException(
"Failed to reconnect to MCP server.",
inner_exception=reconn_ex,
) from reconn_ex
else:
# Second attempt also failed, give up
logger.error(f"MCP connection closed unexpectedly after reconnection: {cl_ex}")
span.set_attribute(OtelAttr.ERROR_TYPE, type(cl_ex).__name__)
span.set_status(trace.StatusCode.ERROR, str(cl_ex))
raise ToolExecutionException(
"Failed to reconnect to MCP server.",
inner_exception=reconn_ex,
) from reconn_ex
else:
# Second attempt also failed, give up
logger.error(f"MCP connection closed unexpectedly after reconnection: {cl_ex}")
raise ToolExecutionException(
f"Failed to call prompt '{prompt_name}' - connection lost.",
inner_exception=cl_ex,
) from cl_ex
except McpError as mcp_exc:
raise ToolExecutionException(mcp_exc.error.message, inner_exception=mcp_exc) from mcp_exc
except Exception as ex:
raise ToolExecutionException(f"Failed to call prompt '{prompt_name}'.", inner_exception=ex) from ex
raise ToolExecutionException(f"Failed to get prompt '{prompt_name}' after retries.")
f"Failed to call prompt '{prompt_name}' - connection lost.",
inner_exception=cl_ex,
) from cl_ex
except McpError as mcp_exc:
span.set_attribute(OtelAttr.ERROR_TYPE, str(mcp_exc.error.code))
span.set_status(trace.StatusCode.ERROR, mcp_exc.error.message)
raise ToolExecutionException(mcp_exc.error.message, inner_exception=mcp_exc) from mcp_exc
except Exception as ex:
span.set_attribute(OtelAttr.ERROR_TYPE, type(ex).__name__)
span.set_status(trace.StatusCode.ERROR, str(ex))
raise ToolExecutionException(f"Failed to call prompt '{prompt_name}'.", inner_exception=ex) from ex
span.set_attribute(OtelAttr.ERROR_TYPE, "RetryExhausted")
span.set_status(trace.StatusCode.ERROR, f"Failed to get prompt '{prompt_name}' after retries.")
raise ToolExecutionException(f"Failed to get prompt '{prompt_name}' after retries.")

async def __aenter__(self) -> Self:
"""Enter the async context manager.
Expand Down
39 changes: 39 additions & 0 deletions python/packages/core/agent_framework/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"create_metric_views",
"create_resource",
"enable_instrumentation",
"get_mcp_call_span",
"get_meter",
"get_tracer",
]
Expand Down Expand Up @@ -272,6 +273,15 @@ class OtelAttr(str, Enum):
AGENT_CREATE_OPERATION = "create_agent"
AGENT_INVOKE_OPERATION = "invoke_agent"

# MCP-specific attributes
# https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/mcp.md
MCP_METHOD_NAME = "mcp.method.name"
MCP_PROTOCOL_VERSION = "mcp.protocol.version"
MCP_SESSION_ID = "mcp.session.id"
JSONRPC_REQUEST_ID = "jsonrpc.request.id"
JSONRPC_PROTOCOL_VERSION = "jsonrpc.protocol.version"
PROMPT_NAME = "gen_ai.prompt.name"

# Agent Framework specific attributes
MEASUREMENT_FUNCTION_TAG_NAME = "agent_framework.function.name"
MEASUREMENT_FUNCTION_INVOCATION_DURATION = "agent_framework.function.invocation.duration"
Expand Down Expand Up @@ -1683,6 +1693,35 @@ def get_function_span(
)


def get_mcp_call_span(
attributes: dict[str, Any],
) -> _AgnosticContextManager[trace.Span]:
"""Start a CLIENT span for an MCP call (tool or prompt).
Creates a span following the OTel 1.40.0 semantic conventions for MCP:
https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/mcp.md
Args:
attributes: The span attributes. Must contain ``mcp.method.name``.
When the call is tool-related, ``gen_ai.tool.name`` is used in the span name.
When the call is prompt-related, ``gen_ai.prompt.name`` is used instead.
Returns:
A context manager that starts the span as the current span.
"""
method_name = attributes.get(OtelAttr.MCP_METHOD_NAME, "mcp")
target = attributes.get(OtelAttr.TOOL_NAME) or attributes.get(OtelAttr.PROMPT_NAME)
span_name = f"{method_name} {target}" if target else method_name
return get_tracer().start_as_current_span(
name=span_name,
kind=trace.SpanKind.CLIENT,
attributes=attributes,
set_status_on_exception=False,
end_on_exit=True,
record_exception=False,
)


@contextlib.contextmanager
def _get_span(
attributes: dict[str, Any],
Expand Down
Loading
Loading