Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
import io.modelcontextprotocol.client.transport.customizer.McpHttpClientAuthorizationErrorHandler;
import io.modelcontextprotocol.client.transport.customizer.McpHttpClientTransportAuthorizationErrorHandler;
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.json.McpJsonDefaults;
Expand Down Expand Up @@ -120,7 +121,7 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {

private final boolean openConnectionOnStartup;

private final McpHttpClientAuthorizationErrorHandler authorizationErrorHandler;
private final McpHttpClientTransportAuthorizationErrorHandler authorizationErrorHandler;

private final boolean resumableStreams;

Expand All @@ -139,7 +140,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient,
HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams,
boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
McpHttpClientAuthorizationErrorHandler authorizationErrorHandler, List<String> supportedProtocolVersions) {
McpHttpClientTransportAuthorizationErrorHandler authorizationErrorHandler,
List<String> supportedProtocolVersions) {
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.requestBuilder = requestBuilder;
Expand Down Expand Up @@ -295,10 +297,13 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
int statusCode = responseEvent.responseInfo().statusCode();
if (statusCode == 401 || statusCode == 403) {
logger.debug("Authorization error in reconnect with code {}", statusCode);
var request = requestBuilder.build();
var requestSnapshot = new HttpRequestSnapshot(request.uri(), request.method(),
request.headers());
return Mono.<McpSchema.JSONRPCMessage>error(
new McpHttpClientTransportAuthorizationException(
"Authorization error connecting to SSE stream",
responseEvent.responseInfo()));
responseEvent.responseInfo(), requestSnapshot));
}
else if (statusCode == METHOD_NOT_ALLOWED) {
logger.debug("The server does not support SSE streams, using request-response mode.");
Expand Down Expand Up @@ -417,7 +422,8 @@ private Retry authorizationErrorRetrySpec() {
return Mono.deferContextual(ctx -> {
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
return Mono
.from(this.authorizationErrorHandler.handle(authException.getResponseInfo(), transportContext))
.from(this.authorizationErrorHandler.handle(authException.getResponseInfo(),
authException.getRequestSnapshot(), transportContext))
.switchIfEmpty(Mono.just(false))
.flatMap(shouldRetry -> shouldRetry ? Mono.just(retrySignal.totalRetries())
: Mono.error(retrySignal.failure()));
Expand Down Expand Up @@ -489,7 +495,6 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
return Mono
.from(this.httpRequestCustomizer.customize(builder, "POST", uri, jsonBody, transportContext));
}).flatMapMany(requestBuilder -> Flux.<ResponseEvent>create(responseEventSink -> {

// Create the async request with proper body subscriber selection
Mono.fromFuture(this.httpClient
.sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(responseEventSink))
Expand All @@ -502,12 +507,14 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
}
})).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe();

})).flatMap(responseEvent -> {
}).flatMap(responseEvent -> {
int statusCode = responseEvent.responseInfo().statusCode();
if (statusCode == 401 || statusCode == 403) {
var request = requestBuilder.build();
var requestSnapshot = new HttpRequestSnapshot(request.uri(), request.method(), request.headers());
logger.debug("Authorization error in sendMessage with code {}", statusCode);
return Mono.<McpSchema.JSONRPCMessage>error(new McpHttpClientTransportAuthorizationException(
"Authorization error when sending message", responseEvent.responseInfo()));
"Authorization error when sending message", responseEvent.responseInfo(), requestSnapshot));
}

if (transportSession.markInitialized(
Expand Down Expand Up @@ -651,13 +658,12 @@ else if (statusCode == BAD_REQUEST) {
if (ref != null) {
transportSession.removeConnection(ref);
}
})
.contextWrite(deliveredSink.contextView())
.subscribe();
})).contextWrite(deliveredSink.contextView()).subscribe();

disposableRef.set(connection);
transportSession.addConnection(connection);
});

}

private static String sessionIdOrPlaceholder(McpTransportSession<?> transportSession) {
Expand Down Expand Up @@ -695,7 +701,7 @@ public static class Builder {
private List<String> supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05,
ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18, ProtocolVersions.MCP_2025_11_25);

private McpHttpClientAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientAuthorizationErrorHandler.NOOP;
private McpHttpClientTransportAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientTransportAuthorizationErrorHandler.NOOP;

/**
* Creates a new builder with the specified base URI.
Expand Down Expand Up @@ -828,8 +834,34 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
* when sending a message.
* @param authorizationErrorHandler the handler
* @return this builder
* @deprecated in favor of
* {@link #authorizationErrorHandler(McpHttpClientTransportAuthorizationErrorHandler)}
*/
@Deprecated(forRemoval = true, since = "2.0.0")
public Builder authorizationErrorHandler(McpHttpClientAuthorizationErrorHandler authorizationErrorHandler) {
this.authorizationErrorHandler = new McpHttpClientTransportAuthorizationErrorHandler() {
@Override
public Publisher<Boolean> handle(java.net.http.HttpResponse.ResponseInfo responseInfo,
HttpRequestSnapshot requestSnapshot, McpTransportContext context) {
return authorizationErrorHandler.handle(responseInfo, context);
}

@Override
public int maxRetries() {
return authorizationErrorHandler.maxRetries();
}
};
return this;
}

/**
* Sets the handler to be used when the server responds with HTTP 401 or HTTP 403
* when sending a message.
* @param authorizationErrorHandler the handler
* @return this builder
*/
public Builder authorizationErrorHandler(
McpHttpClientTransportAuthorizationErrorHandler authorizationErrorHandler) {
this.authorizationErrorHandler = authorizationErrorHandler;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2026-2026 the original author or authors.
*/

package io.modelcontextprotocol.client.transport;

import java.net.URI;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublisher;

/**
* Captures information about an HTTP request. We use this instead of passing the plain
* {@link HttpRequest} object because we want to avoid retaining a reference to the
* request's {@link BodyPublisher}.
*
* @param requestUri the HTTP request URI
* @param method the HTTP method
* @param headers the HTTP request headers
* @author Daniel Garnier-Moiroux
*/
public record HttpRequestSnapshot(URI requestUri, String method, HttpHeaders headers) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@ public class McpHttpClientTransportAuthorizationException extends McpTransportEx

private final HttpResponse.ResponseInfo responseInfo;

public McpHttpClientTransportAuthorizationException(String message, HttpResponse.ResponseInfo responseInfo) {
private final HttpRequestSnapshot requestSnapshot;

public McpHttpClientTransportAuthorizationException(String message, HttpResponse.ResponseInfo responseInfo,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't mind reversing the order of response and request here either :)

HttpRequestSnapshot requestSnapshot) {
super(message);
this.responseInfo = responseInfo;
this.requestSnapshot = requestSnapshot;
}

public HttpResponse.ResponseInfo getResponseInfo() {
return responseInfo;
}

public HttpRequestSnapshot getRequestSnapshot() {
return requestSnapshot;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.net.http.HttpResponse;

import io.modelcontextprotocol.client.transport.HttpRequestSnapshot;
import io.modelcontextprotocol.client.transport.McpHttpClientTransportAuthorizationException;
import io.modelcontextprotocol.common.McpTransportContext;
import org.reactivestreams.Publisher;
Expand All @@ -20,7 +21,9 @@
* "https://modelcontextprotocol.io/specification/2025-11-25/basic/authorization">MCP
* Specification: Authorization</a>
* @author Daniel Garnier-Moiroux
* @deprecated in favor of {@link McpHttpClientTransportAuthorizationErrorHandler}
*/
@Deprecated(forRemoval = true, since = "2.0.0")
public interface McpHttpClientAuthorizationErrorHandler {

/**
Expand All @@ -38,7 +41,10 @@ public interface McpHttpClientAuthorizationErrorHandler {
* @param context the MCP client transport context
* @return {@link Publisher} emitting true if the original request should be replayed,
* false otherwise.
* @deprecated in favor of
* {@link McpHttpClientTransportAuthorizationErrorHandler#handle(HttpResponse.ResponseInfo, HttpRequestSnapshot, McpTransportContext)}
*/
@Deprecated(forRemoval = true, since = "2.0.0")
Publisher<Boolean> handle(HttpResponse.ResponseInfo responseInfo, McpTransportContext context);

/**
Expand Down Expand Up @@ -87,7 +93,10 @@ interface Sync {
* @param responseInfo the HTTP response information
* @param context the MCP client transport context
* @return true if the original request should be replayed, false otherwise.
* @deprecated in favor of
* {@link McpHttpClientTransportAuthorizationErrorHandler.Sync#handle(HttpResponse.ResponseInfo, HttpRequestSnapshot, McpTransportContext)}
*/
@Deprecated(forRemoval = true, since = "2.0.0")
boolean handle(HttpResponse.ResponseInfo responseInfo, McpTransportContext context);

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2026-2026 the original author or authors.
*/

package io.modelcontextprotocol.client.transport.customizer;

import java.net.URI;
import java.net.http.HttpResponse;

import io.modelcontextprotocol.client.transport.HttpRequestSnapshot;
import io.modelcontextprotocol.client.transport.McpHttpClientTransportAuthorizationException;
import io.modelcontextprotocol.common.McpTransportContext;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
* Handle security-related errors in HTTP-client based transports. This class handles MCP
* server responses with status code 401 and 403.
*
* @see <a href=
* "https://modelcontextprotocol.io/specification/2025-11-25/basic/authorization">MCP
* Specification: Authorization</a>
* @author Daniel Garnier-Moiroux
*/
public interface McpHttpClientTransportAuthorizationErrorHandler {

/**
* Handle authorization error (HTTP 401 or 403), and signal whether the HTTP request
* should be retried or not. If the publisher returns true, the original transport
* method (connect, sendMessage) will be replayed with the original arguments.
* Otherwise, the transport will throw an
* {@link McpHttpClientTransportAuthorizationException}, indicating the error status.
* <p>
* If the returned {@link Publisher} errors, the error will be propagated to the
* calling method, to be handled by the caller.
* <p>
* The number of retries is bounded by {@link #maxRetries()}.
* @param responseInfo the HTTP response information
* @param requestSnapshot the HTTP request snapshot that failed authorization
* @param context the MCP client transport context
* @return {@link Publisher} emitting true if the original request should be replayed,
* false otherwise.
*/
Publisher<Boolean> handle(HttpResponse.ResponseInfo responseInfo, HttpRequestSnapshot requestSnapshot,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow having the response info before the request info feels counter-intuitive. Can we make the requestSnapshot be the first argument?

McpTransportContext context);

/**
* Maximum number of authorization error retries the transport will attempt. When the
* handler signals a retry via {@link #handle}, the transport will replay the original
* request at most this many times. If the authorization error persists after
* exhausting all retries, the transport will propagate the
* {@link McpHttpClientTransportAuthorizationException}.
* <p>
* Defaults to {@code 1}.
* @return the maximum number of retries
*/
default int maxRetries() {
return 1;
}

/**
* A no-op handler, used in the default use-case.
*/
McpHttpClientTransportAuthorizationErrorHandler NOOP = new Noop();

/**
* Create a {@link McpHttpClientTransportAuthorizationErrorHandler} from a synchronous
* handler. Will be subscribed on {@link Schedulers#boundedElastic()}. The handler may
* be blocking.
* @param handler the synchronous handler
* @return an async handler
*/
static McpHttpClientTransportAuthorizationErrorHandler fromSync(Sync handler) {
return (info, snapshot, context) -> Mono.fromCallable(() -> handler.handle(info, snapshot, context))
.subscribeOn(Schedulers.boundedElastic());
}

/**
* Synchronous authorization error handler.
*/
interface Sync {

/**
* Handle authorization error (HTTP 401 or 403), and signal whether the HTTP
* request should be retried or not. If the return value is true, the original
* transport method (connect, sendMessage) will be replayed with the original
* arguments. Otherwise, the transport will throw an
* {@link McpHttpClientTransportAuthorizationException}, indicating the error
* status.
* @param responseInfo the HTTP response information
* @param requestSnapshot the HTTP request snapshot that failed authorization
* @param context the MCP client transport context
* @return true if the original request should be replayed, false otherwise.
*/
boolean handle(HttpResponse.ResponseInfo responseInfo, HttpRequestSnapshot requestSnapshot,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, let's change the order here too.

McpTransportContext context);

}

class Noop implements McpHttpClientTransportAuthorizationErrorHandler {

@Override
public Publisher<Boolean> handle(HttpResponse.ResponseInfo responseInfo, HttpRequestSnapshot requestSnapshot,
McpTransportContext context) {
return Mono.just(false);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

/**
* @author Daniel Garnier-Moiroux
* @deprecated use {@link McpHttpClientTransportAuthorizationErrorHandlerTest}
*/
@Deprecated
class McpHttpClientAuthorizationErrorHandlerTest {

private final HttpResponse.ResponseInfo responseInfo = mock(HttpResponse.ResponseInfo.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2026-2026 the original author or authors.
*/
package io.modelcontextprotocol.client.transport.customizer;

import java.net.URI;
import java.net.http.HttpResponse;

import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.client.transport.HttpRequestSnapshot;
import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;

import static org.mockito.Mockito.mock;

/**
* @author Daniel Garnier-Moiroux
*/
class McpHttpClientTransportAuthorizationErrorHandlerTest {

private final HttpResponse.ResponseInfo responseInfo = mock(HttpResponse.ResponseInfo.class);

private final HttpRequestSnapshot requestSnapshot = new HttpRequestSnapshot(URI.create("http://localhost/mcp"),
"GET", java.net.http.HttpHeaders.of(java.util.Map.of(), (a, b) -> true));

private final McpTransportContext context = McpTransportContext.EMPTY;

@Test
void whenTrueThenRetry() {
McpHttpClientTransportAuthorizationErrorHandler handler = McpHttpClientTransportAuthorizationErrorHandler
.fromSync((info, snapshot, ctx) -> true);
StepVerifier.create(handler.handle(responseInfo, requestSnapshot, context)).expectNext(true).verifyComplete();
}

@Test
void whenFalseThenError() {
McpHttpClientTransportAuthorizationErrorHandler handler = McpHttpClientTransportAuthorizationErrorHandler
.fromSync((info, snapshot, ctx) -> false);
StepVerifier.create(handler.handle(responseInfo, requestSnapshot, context)).expectNext(false).verifyComplete();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test name and the behaviour don't seem to match. The test verifies completion, while the test name mentions error. Should it be aligned?

}

@Test
void whenExceptionThenPropagate() {
McpHttpClientTransportAuthorizationErrorHandler handler = McpHttpClientTransportAuthorizationErrorHandler
.fromSync((info, snapshot, ctx) -> {
throw new IllegalStateException("sync handler error");
});
StepVerifier.create(handler.handle(responseInfo, requestSnapshot, context))
.expectErrorMatches(t -> t instanceof IllegalStateException && t.getMessage().equals("sync handler error"))
.verify();
}

}
Loading
Loading