1717import sys
1818
1919import click
20+ import httpx
2021from mcp.server.auth.provider import AccessToken, TokenVerifier
2122from mcp.server.auth.settings import AuthSettings
2223from mcp.server.fastmcp import FastMCP
2526logger = logging.getLogger(__name__)
2627
2728
28- class ConformanceTokenVerifier (TokenVerifier):
29+ class IntrospectionTokenVerifier (TokenVerifier):
2930 """
30- Token verifier for conformance testing .
31+ Token verifier that uses OAuth 2.0 Token Introspection (RFC 7662) .
3132
32- Validates Bearer tokens that start with 'test-token' or 'cc-token'
33- (as issued by the fake auth server) .
33+ Validates Bearer tokens by calling the authorization server's
34+ introspection endpoint .
3435 """
3536
37+ def __init__(self, auth_server_url: str):
38+ self._auth_server_url = auth_server_url.rstrip("/")
39+ self._introspection_endpoint: str | None = None
40+ self._http_client = httpx.AsyncClient()
41+
42+ async def _get_introspection_endpoint(self) -> str:
43+ """Discover the introspection endpoint from AS metadata."""
44+ if self._introspection_endpoint is not None:
45+ return self._introspection_endpoint
46+
47+ # Fetch AS metadata
48+ metadata_url = f"{self._auth_server_url}/.well-known/oauth-authorization-server"
49+ logger.debug(f"Fetching AS metadata from {metadata_url}")
50+
51+ response = await self._http_client.get(metadata_url)
52+ response.raise_for_status()
53+ metadata = response.json()
54+
55+ introspection_endpoint = metadata.get("introspection_endpoint")
56+ if not introspection_endpoint:
57+ raise ValueError("Authorization server does not advertise introspection_endpoint")
58+
59+ self._introspection_endpoint = introspection_endpoint
60+ logger.debug(f"Discovered introspection endpoint: {introspection_endpoint}")
61+ return introspection_endpoint
62+
3663 async def verify_token(self, token: str) -> AccessToken | None:
37- """Verify a bearer token and return access info if valid."""
38- # Accept tokens that start with 'test-token' or 'cc-token'
39- if token.startswith("test-token") or token.startswith("cc-token"):
64+ """Verify a bearer token using introspection and return access info if valid."""
65+ try:
66+ introspection_endpoint = await self._get_introspection_endpoint()
67+
68+ # Call introspection endpoint (RFC 7662)
69+ response = await self._http_client.post(
70+ introspection_endpoint,
71+ data={"token": token},
72+ headers={"Content-Type": "application/x-www-form-urlencoded"},
73+ )
74+ response.raise_for_status()
75+ result = response.json()
76+
77+ # Check if token is active
78+ if not result.get("active", False):
79+ logger.debug("Token introspection returned active=false")
80+ return None
81+
82+ # Extract token info from introspection response
83+ client_id: str = result.get("client_id", "unknown")
84+ scope_str: str = result.get("scope", "")
85+ scopes: list[str] = scope_str.split() if scope_str else []
86+ expires_at: int | None = result.get("exp")
87+
88+ logger.debug(f"Token verified for client {client_id} with scopes {scopes}")
4089 return AccessToken(
4190 token=token,
42- client_id="conformance-test-client",
43- scopes=["mcp:read", "mcp:write"],
91+ client_id=client_id,
92+ scopes=scopes,
93+ expires_at=expires_at,
4494 )
45- return None
95+ except Exception:
96+ logger.exception("Token introspection failed")
97+ return None
4698
4799
48100def create_server(auth_server_url: str, port: int) -> FastMCP:
@@ -51,7 +103,7 @@ def create_server(auth_server_url: str, port: int) -> FastMCP:
51103
52104 mcp = FastMCP(
53105 name="mcp-auth-test-server",
54- token_verifier=ConformanceTokenVerifier( ),
106+ token_verifier=IntrospectionTokenVerifier(auth_server_url ),
55107 auth=AuthSettings(
56108 issuer_url=AnyHttpUrl(auth_server_url),
57109 resource_server_url=AnyHttpUrl(base_url),
0 commit comments