From 065c2a77b171311aa61e3ebd4330e2f958246ecd Mon Sep 17 00:00:00 2001 From: Daniel Garnier-Moiroux Date: Fri, 22 May 2026 09:38:18 +0200 Subject: [PATCH] Expose request URI in McpHttpClientAuthorizationErrorHandler Breaking change Signed-off-by: Daniel Garnier-Moiroux --- .../HttpClientStreamableHttpTransport.java | 54 +++++++-- .../client/transport/HttpRequestSnapshot.java | 23 ++++ ...ClientTransportAuthorizationException.java | 10 +- ...cpHttpClientAuthorizationErrorHandler.java | 9 ++ ...entTransportAuthorizationErrorHandler.java | 111 ++++++++++++++++++ ...tpClientAuthorizationErrorHandlerTest.java | 2 + ...ransportAuthorizationErrorHandlerTest.java | 53 +++++++++ ...eamableHttpTransportErrorHandlingTest.java | 49 +++++--- 8 files changed, 280 insertions(+), 31 deletions(-) create mode 100644 mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpRequestSnapshot.java create mode 100644 mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientTransportAuthorizationErrorHandler.java create mode 100644 mcp-core/src/test/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientTransportAuthorizationErrorHandlerTest.java diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 142c0302c..a0d6bd2a3 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -24,6 +24,7 @@ import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent; import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer; import io.modelcontextprotocol.client.transport.customizer.McpHttpClientAuthorizationErrorHandler; +import io.modelcontextprotocol.client.transport.customizer.McpHttpClientTransportAuthorizationErrorHandler; import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer; import io.modelcontextprotocol.common.McpTransportContext; import io.modelcontextprotocol.json.McpJsonDefaults; @@ -120,7 +121,7 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport { private final boolean openConnectionOnStartup; - private final McpHttpClientAuthorizationErrorHandler authorizationErrorHandler; + private final McpHttpClientTransportAuthorizationErrorHandler authorizationErrorHandler; private final boolean resumableStreams; @@ -139,7 +140,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport { private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams, boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer, - McpHttpClientAuthorizationErrorHandler authorizationErrorHandler, List supportedProtocolVersions) { + McpHttpClientTransportAuthorizationErrorHandler authorizationErrorHandler, + List supportedProtocolVersions) { this.jsonMapper = jsonMapper; this.httpClient = httpClient; this.requestBuilder = requestBuilder; @@ -295,10 +297,13 @@ private Mono reconnect(McpTransportStream stream) { int statusCode = responseEvent.responseInfo().statusCode(); if (statusCode == 401 || statusCode == 403) { logger.debug("Authorization error in reconnect with code {}", statusCode); + var request = requestBuilder.build(); + var requestSnapshot = new HttpRequestSnapshot(request.uri(), request.method(), + request.headers()); return Mono.error( new McpHttpClientTransportAuthorizationException( "Authorization error connecting to SSE stream", - responseEvent.responseInfo())); + responseEvent.responseInfo(), requestSnapshot)); } else if (statusCode == METHOD_NOT_ALLOWED) { logger.debug("The server does not support SSE streams, using request-response mode."); @@ -417,7 +422,8 @@ private Retry authorizationErrorRetrySpec() { return Mono.deferContextual(ctx -> { var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY); return Mono - .from(this.authorizationErrorHandler.handle(authException.getResponseInfo(), transportContext)) + .from(this.authorizationErrorHandler.handle(authException.getResponseInfo(), + authException.getRequestSnapshot(), transportContext)) .switchIfEmpty(Mono.just(false)) .flatMap(shouldRetry -> shouldRetry ? Mono.just(retrySignal.totalRetries()) : Mono.error(retrySignal.failure())); @@ -489,7 +495,6 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { return Mono .from(this.httpRequestCustomizer.customize(builder, "POST", uri, jsonBody, transportContext)); }).flatMapMany(requestBuilder -> Flux.create(responseEventSink -> { - // Create the async request with proper body subscriber selection Mono.fromFuture(this.httpClient .sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(responseEventSink)) @@ -502,12 +507,14 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { } })).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe(); - })).flatMap(responseEvent -> { + }).flatMap(responseEvent -> { int statusCode = responseEvent.responseInfo().statusCode(); if (statusCode == 401 || statusCode == 403) { + var request = requestBuilder.build(); + var requestSnapshot = new HttpRequestSnapshot(request.uri(), request.method(), request.headers()); logger.debug("Authorization error in sendMessage with code {}", statusCode); return Mono.error(new McpHttpClientTransportAuthorizationException( - "Authorization error when sending message", responseEvent.responseInfo())); + "Authorization error when sending message", responseEvent.responseInfo(), requestSnapshot)); } if (transportSession.markInitialized( @@ -651,13 +658,12 @@ else if (statusCode == BAD_REQUEST) { if (ref != null) { transportSession.removeConnection(ref); } - }) - .contextWrite(deliveredSink.contextView()) - .subscribe(); + })).contextWrite(deliveredSink.contextView()).subscribe(); disposableRef.set(connection); transportSession.addConnection(connection); }); + } private static String sessionIdOrPlaceholder(McpTransportSession transportSession) { @@ -695,7 +701,7 @@ public static class Builder { private List supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05, ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18, ProtocolVersions.MCP_2025_11_25); - private McpHttpClientAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientAuthorizationErrorHandler.NOOP; + private McpHttpClientTransportAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientTransportAuthorizationErrorHandler.NOOP; /** * Creates a new builder with the specified base URI. @@ -828,8 +834,34 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as * when sending a message. * @param authorizationErrorHandler the handler * @return this builder + * @deprecated in favor of + * {@link #authorizationErrorHandler(McpHttpClientTransportAuthorizationErrorHandler)} */ + @Deprecated(forRemoval = true, since = "2.0.0") public Builder authorizationErrorHandler(McpHttpClientAuthorizationErrorHandler authorizationErrorHandler) { + this.authorizationErrorHandler = new McpHttpClientTransportAuthorizationErrorHandler() { + @Override + public Publisher handle(java.net.http.HttpResponse.ResponseInfo responseInfo, + HttpRequestSnapshot requestSnapshot, McpTransportContext context) { + return authorizationErrorHandler.handle(responseInfo, context); + } + + @Override + public int maxRetries() { + return authorizationErrorHandler.maxRetries(); + } + }; + return this; + } + + /** + * Sets the handler to be used when the server responds with HTTP 401 or HTTP 403 + * when sending a message. + * @param authorizationErrorHandler the handler + * @return this builder + */ + public Builder authorizationErrorHandler( + McpHttpClientTransportAuthorizationErrorHandler authorizationErrorHandler) { this.authorizationErrorHandler = authorizationErrorHandler; return this; } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpRequestSnapshot.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpRequestSnapshot.java new file mode 100644 index 000000000..cbc0859f5 --- /dev/null +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpRequestSnapshot.java @@ -0,0 +1,23 @@ +/* + * Copyright 2026-2026 the original author or authors. + */ + +package io.modelcontextprotocol.client.transport; + +import java.net.URI; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublisher; + +/** + * Captures information about an HTTP request. We use this instead of passing the plain + * {@link HttpRequest} object because we want to avoid retaining a reference to the + * request's {@link BodyPublisher}. + * + * @param requestUri the HTTP request URI + * @param method the HTTP method + * @param headers the HTTP request headers + * @author Daniel Garnier-Moiroux + */ +public record HttpRequestSnapshot(URI requestUri, String method, HttpHeaders headers) { +} diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpHttpClientTransportAuthorizationException.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpHttpClientTransportAuthorizationException.java index 31e5ae95e..a0f4514d3 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpHttpClientTransportAuthorizationException.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/McpHttpClientTransportAuthorizationException.java @@ -19,13 +19,21 @@ public class McpHttpClientTransportAuthorizationException extends McpTransportEx private final HttpResponse.ResponseInfo responseInfo; - public McpHttpClientTransportAuthorizationException(String message, HttpResponse.ResponseInfo responseInfo) { + private final HttpRequestSnapshot requestSnapshot; + + public McpHttpClientTransportAuthorizationException(String message, HttpResponse.ResponseInfo responseInfo, + HttpRequestSnapshot requestSnapshot) { super(message); this.responseInfo = responseInfo; + this.requestSnapshot = requestSnapshot; } public HttpResponse.ResponseInfo getResponseInfo() { return responseInfo; } + public HttpRequestSnapshot getRequestSnapshot() { + return requestSnapshot; + } + } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandler.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandler.java index c98fac61d..a426516dd 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandler.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandler.java @@ -6,6 +6,7 @@ import java.net.http.HttpResponse; +import io.modelcontextprotocol.client.transport.HttpRequestSnapshot; import io.modelcontextprotocol.client.transport.McpHttpClientTransportAuthorizationException; import io.modelcontextprotocol.common.McpTransportContext; import org.reactivestreams.Publisher; @@ -20,7 +21,9 @@ * "https://modelcontextprotocol.io/specification/2025-11-25/basic/authorization">MCP * Specification: Authorization * @author Daniel Garnier-Moiroux + * @deprecated in favor of {@link McpHttpClientTransportAuthorizationErrorHandler} */ +@Deprecated(forRemoval = true, since = "2.0.0") public interface McpHttpClientAuthorizationErrorHandler { /** @@ -38,7 +41,10 @@ public interface McpHttpClientAuthorizationErrorHandler { * @param context the MCP client transport context * @return {@link Publisher} emitting true if the original request should be replayed, * false otherwise. + * @deprecated in favor of + * {@link McpHttpClientTransportAuthorizationErrorHandler#handle(HttpResponse.ResponseInfo, HttpRequestSnapshot, McpTransportContext)} */ + @Deprecated(forRemoval = true, since = "2.0.0") Publisher handle(HttpResponse.ResponseInfo responseInfo, McpTransportContext context); /** @@ -87,7 +93,10 @@ interface Sync { * @param responseInfo the HTTP response information * @param context the MCP client transport context * @return true if the original request should be replayed, false otherwise. + * @deprecated in favor of + * {@link McpHttpClientTransportAuthorizationErrorHandler.Sync#handle(HttpResponse.ResponseInfo, HttpRequestSnapshot, McpTransportContext)} */ + @Deprecated(forRemoval = true, since = "2.0.0") boolean handle(HttpResponse.ResponseInfo responseInfo, McpTransportContext context); } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientTransportAuthorizationErrorHandler.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientTransportAuthorizationErrorHandler.java new file mode 100644 index 000000000..205f2541a --- /dev/null +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientTransportAuthorizationErrorHandler.java @@ -0,0 +1,111 @@ +/* + * Copyright 2026-2026 the original author or authors. + */ + +package io.modelcontextprotocol.client.transport.customizer; + +import java.net.URI; +import java.net.http.HttpResponse; + +import io.modelcontextprotocol.client.transport.HttpRequestSnapshot; +import io.modelcontextprotocol.client.transport.McpHttpClientTransportAuthorizationException; +import io.modelcontextprotocol.common.McpTransportContext; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +/** + * Handle security-related errors in HTTP-client based transports. This class handles MCP + * server responses with status code 401 and 403. + * + * @see MCP + * Specification: Authorization + * @author Daniel Garnier-Moiroux + */ +public interface McpHttpClientTransportAuthorizationErrorHandler { + + /** + * Handle authorization error (HTTP 401 or 403), and signal whether the HTTP request + * should be retried or not. If the publisher returns true, the original transport + * method (connect, sendMessage) will be replayed with the original arguments. + * Otherwise, the transport will throw an + * {@link McpHttpClientTransportAuthorizationException}, indicating the error status. + *

+ * If the returned {@link Publisher} errors, the error will be propagated to the + * calling method, to be handled by the caller. + *

+ * The number of retries is bounded by {@link #maxRetries()}. + * @param responseInfo the HTTP response information + * @param requestSnapshot the HTTP request snapshot that failed authorization + * @param context the MCP client transport context + * @return {@link Publisher} emitting true if the original request should be replayed, + * false otherwise. + */ + Publisher handle(HttpResponse.ResponseInfo responseInfo, HttpRequestSnapshot requestSnapshot, + McpTransportContext context); + + /** + * Maximum number of authorization error retries the transport will attempt. When the + * handler signals a retry via {@link #handle}, the transport will replay the original + * request at most this many times. If the authorization error persists after + * exhausting all retries, the transport will propagate the + * {@link McpHttpClientTransportAuthorizationException}. + *

+ * Defaults to {@code 1}. + * @return the maximum number of retries + */ + default int maxRetries() { + return 1; + } + + /** + * A no-op handler, used in the default use-case. + */ + McpHttpClientTransportAuthorizationErrorHandler NOOP = new Noop(); + + /** + * Create a {@link McpHttpClientTransportAuthorizationErrorHandler} from a synchronous + * handler. Will be subscribed on {@link Schedulers#boundedElastic()}. The handler may + * be blocking. + * @param handler the synchronous handler + * @return an async handler + */ + static McpHttpClientTransportAuthorizationErrorHandler fromSync(Sync handler) { + return (info, snapshot, context) -> Mono.fromCallable(() -> handler.handle(info, snapshot, context)) + .subscribeOn(Schedulers.boundedElastic()); + } + + /** + * Synchronous authorization error handler. + */ + interface Sync { + + /** + * Handle authorization error (HTTP 401 or 403), and signal whether the HTTP + * request should be retried or not. If the return value is true, the original + * transport method (connect, sendMessage) will be replayed with the original + * arguments. Otherwise, the transport will throw an + * {@link McpHttpClientTransportAuthorizationException}, indicating the error + * status. + * @param responseInfo the HTTP response information + * @param requestSnapshot the HTTP request snapshot that failed authorization + * @param context the MCP client transport context + * @return true if the original request should be replayed, false otherwise. + */ + boolean handle(HttpResponse.ResponseInfo responseInfo, HttpRequestSnapshot requestSnapshot, + McpTransportContext context); + + } + + class Noop implements McpHttpClientTransportAuthorizationErrorHandler { + + @Override + public Publisher handle(HttpResponse.ResponseInfo responseInfo, HttpRequestSnapshot requestSnapshot, + McpTransportContext context) { + return Mono.just(false); + } + + } + +} diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandlerTest.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandlerTest.java index 2812522f5..6b4e24667 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandlerTest.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientAuthorizationErrorHandlerTest.java @@ -13,7 +13,9 @@ /** * @author Daniel Garnier-Moiroux + * @deprecated use {@link McpHttpClientTransportAuthorizationErrorHandlerTest} */ +@Deprecated class McpHttpClientAuthorizationErrorHandlerTest { private final HttpResponse.ResponseInfo responseInfo = mock(HttpResponse.ResponseInfo.class); diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientTransportAuthorizationErrorHandlerTest.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientTransportAuthorizationErrorHandlerTest.java new file mode 100644 index 000000000..8303c1d11 --- /dev/null +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/transport/customizer/McpHttpClientTransportAuthorizationErrorHandlerTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2026-2026 the original author or authors. + */ +package io.modelcontextprotocol.client.transport.customizer; + +import java.net.URI; +import java.net.http.HttpResponse; + +import io.modelcontextprotocol.common.McpTransportContext; +import io.modelcontextprotocol.client.transport.HttpRequestSnapshot; +import org.junit.jupiter.api.Test; +import reactor.test.StepVerifier; + +import static org.mockito.Mockito.mock; + +/** + * @author Daniel Garnier-Moiroux + */ +class McpHttpClientTransportAuthorizationErrorHandlerTest { + + private final HttpResponse.ResponseInfo responseInfo = mock(HttpResponse.ResponseInfo.class); + + private final HttpRequestSnapshot requestSnapshot = new HttpRequestSnapshot(URI.create("http://localhost/mcp"), + "GET", java.net.http.HttpHeaders.of(java.util.Map.of(), (a, b) -> true)); + + private final McpTransportContext context = McpTransportContext.EMPTY; + + @Test + void whenTrueThenRetry() { + McpHttpClientTransportAuthorizationErrorHandler handler = McpHttpClientTransportAuthorizationErrorHandler + .fromSync((info, snapshot, ctx) -> true); + StepVerifier.create(handler.handle(responseInfo, requestSnapshot, context)).expectNext(true).verifyComplete(); + } + + @Test + void whenFalseThenError() { + McpHttpClientTransportAuthorizationErrorHandler handler = McpHttpClientTransportAuthorizationErrorHandler + .fromSync((info, snapshot, ctx) -> false); + StepVerifier.create(handler.handle(responseInfo, requestSnapshot, context)).expectNext(false).verifyComplete(); + } + + @Test + void whenExceptionThenPropagate() { + McpHttpClientTransportAuthorizationErrorHandler handler = McpHttpClientTransportAuthorizationErrorHandler + .fromSync((info, snapshot, ctx) -> { + throw new IllegalStateException("sync handler error"); + }); + StepVerifier.create(handler.handle(responseInfo, requestSnapshot, context)) + .expectErrorMatches(t -> t instanceof IllegalStateException && t.getMessage().equals("sync handler error")) + .verify(); + } + +} diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportErrorHandlingTest.java b/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportErrorHandlingTest.java index 3457903a9..f2e4bb37f 100644 --- a/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportErrorHandlingTest.java +++ b/mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportErrorHandlingTest.java @@ -6,6 +6,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.net.URI; import java.net.http.HttpResponse; import java.time.Duration; import java.util.ArrayList; @@ -16,7 +17,8 @@ import java.util.function.Predicate; import com.sun.net.httpserver.HttpServer; -import io.modelcontextprotocol.client.transport.customizer.McpHttpClientAuthorizationErrorHandler; +import io.modelcontextprotocol.client.transport.HttpRequestSnapshot; +import io.modelcontextprotocol.client.transport.customizer.McpHttpClientTransportAuthorizationErrorHandler; import io.modelcontextprotocol.common.McpTransportContext; import io.modelcontextprotocol.server.transport.TomcatTestUtil; import io.modelcontextprotocol.spec.HttpHeaders; @@ -403,9 +405,12 @@ void invokeHandler(int httpStatus) { AtomicReference capturedResponseInfo = new AtomicReference<>(); AtomicReference capturedContext = new AtomicReference<>(); + AtomicReference capturedSnapshot = new AtomicReference<>(); + var authTransport = HttpClientStreamableHttpTransport.builder(HOST) - .authorizationErrorHandler((responseInfo, context) -> { + .authorizationErrorHandler((responseInfo, requestSnapshot, context) -> { capturedResponseInfo.set(responseInfo); + capturedSnapshot.set(requestSnapshot); capturedContext.set(context); return Mono.just(false); }) @@ -417,6 +422,8 @@ void invokeHandler(int httpStatus) { assertThat(processedMessagesCount.get()).isEqualTo(1); assertThat(capturedResponseInfo.get()).isNotNull(); assertThat(capturedResponseInfo.get().statusCode()).isEqualTo(httpStatus); + assertThat(capturedSnapshot.get()).isNotNull(); + assertThat(capturedSnapshot.get().requestUri().toString()).isEqualTo(HOST + "/mcp"); assertThat(capturedContext.get()).isNotNull(); StepVerifier.create(authTransport.closeGracefully()).verifyComplete(); @@ -440,7 +447,7 @@ void defaultHandler() { void retry() { serverResponseStatus.set(401); var authTransport = HttpClientStreamableHttpTransport.builder(HOST) - .authorizationErrorHandler((responseInfo, context) -> { + .authorizationErrorHandler((responseInfo, requestSnapshot, context) -> { serverResponseStatus.set(200); return Mono.just(true); }) @@ -456,7 +463,7 @@ void retry() { void retryAtMostOnce() { serverResponseStatus.set(401); var authTransport = HttpClientStreamableHttpTransport.builder(HOST) - .authorizationErrorHandler((responseInfo, context) -> Mono.just(true)) + .authorizationErrorHandler((responseInfo, requestSnapshot, context) -> Mono.just(true)) .build(); StepVerifier.create(authTransport.sendMessage(createTestRequestMessage())) .expectErrorMatches(authorizationError(401)) @@ -471,10 +478,10 @@ void retryAtMostOnce() { void customMaxRetries() { serverResponseStatus.set(401); var authTransport = HttpClientStreamableHttpTransport.builder(HOST) - .authorizationErrorHandler(new McpHttpClientAuthorizationErrorHandler() { + .authorizationErrorHandler(new McpHttpClientTransportAuthorizationErrorHandler() { @Override public Publisher handle(HttpResponse.ResponseInfo responseInfo, - McpTransportContext context) { + HttpRequestSnapshot requestSnapshot, McpTransportContext context) { return Mono.just(true); } @@ -498,7 +505,7 @@ void noRetry() { serverResponseStatus.set(401); var authTransport = HttpClientStreamableHttpTransport.builder(HOST) - .authorizationErrorHandler((responseInfo, context) -> Mono.just(false)) + .authorizationErrorHandler((responseInfo, requestSnapshot, context) -> Mono.just(false)) .build(); StepVerifier.create(authTransport.sendMessage(createTestRequestMessage())) @@ -513,8 +520,8 @@ void noRetry() { void propagateHandlerError() { serverResponseStatus.set(401); var authTransport = HttpClientStreamableHttpTransport.builder(HOST) - .authorizationErrorHandler( - (responseInfo, context) -> Mono.error(new IllegalStateException("handler error"))) + .authorizationErrorHandler((responseInfo, requestUri, context) -> Mono + .error(new IllegalStateException("handler error"))) .build(); StepVerifier.create(authTransport.sendMessage(createTestRequestMessage())) @@ -529,7 +536,7 @@ void propagateHandlerError() { void emptyHandler() { serverResponseStatus.set(401); var authTransport = HttpClientStreamableHttpTransport.builder(HOST) - .authorizationErrorHandler((responseInfo, context) -> Mono.empty()) + .authorizationErrorHandler((responseInfo, requestSnapshot, context) -> Mono.empty()) .build(); StepVerifier.create(authTransport.sendMessage(createTestRequestMessage())) @@ -552,11 +559,13 @@ void invokeHandler(int httpStatus) { AtomicReference capturedException = new AtomicReference<>(); AtomicReference capturedResponseInfo = new AtomicReference<>(); + AtomicReference capturedSnapshot = new AtomicReference<>(); AtomicReference capturedContext = new AtomicReference<>(); var authTransport = HttpClientStreamableHttpTransport.builder(HOST) - .authorizationErrorHandler((responseInfo, context) -> { + .authorizationErrorHandler((responseInfo, requestSnapshot, context) -> { capturedResponseInfo.set(responseInfo); + capturedSnapshot.set(requestSnapshot); capturedContext.set(context); return Mono.just(false); }) @@ -572,6 +581,8 @@ void invokeHandler(int httpStatus) { assertThat(messages).isEmpty(); assertThat(capturedResponseInfo.get()).isNotNull(); assertThat(capturedResponseInfo.get().statusCode()).isEqualTo(httpStatus); + assertThat(capturedSnapshot.get()).isNotNull(); + assertThat(capturedSnapshot.get().requestUri().toString()).isEqualTo(HOST + "/mcp"); assertThat(capturedContext.get()).isNotNull(); assertThat(capturedException.get()).hasMessage("Authorization error connecting to SSE stream") .asInstanceOf(type(McpHttpClientTransportAuthorizationException.class)) @@ -606,7 +617,7 @@ void retry() { AtomicReference capturedException = new AtomicReference<>(); var authTransport = HttpClientStreamableHttpTransport.builder(HOST) .openConnectionOnStartup(true) - .authorizationErrorHandler((responseInfo, context) -> { + .authorizationErrorHandler((responseInfo, requestSnapshot, context) -> { serverSseResponseStatus.set(200); return Mono.just(true); }) @@ -636,7 +647,7 @@ void retryAtMostOnce() { AtomicReference capturedException = new AtomicReference<>(); var authTransport = HttpClientStreamableHttpTransport.builder(HOST) .openConnectionOnStartup(true) - .authorizationErrorHandler((responseInfo, context) -> { + .authorizationErrorHandler((responseInfo, requestSnapshot, context) -> { return Mono.just(true); }) .build(); @@ -661,10 +672,10 @@ void customMaxRetries() { AtomicReference capturedException = new AtomicReference<>(); var authTransport = HttpClientStreamableHttpTransport.builder(HOST) .openConnectionOnStartup(true) - .authorizationErrorHandler(new McpHttpClientAuthorizationErrorHandler() { + .authorizationErrorHandler(new McpHttpClientTransportAuthorizationErrorHandler() { @Override public Publisher handle(HttpResponse.ResponseInfo responseInfo, - McpTransportContext context) { + HttpRequestSnapshot requestSnapshot, McpTransportContext context) { return Mono.just(true); } @@ -695,7 +706,7 @@ void noRetry() { AtomicReference capturedException = new AtomicReference<>(); var authTransport = HttpClientStreamableHttpTransport.builder(HOST) .openConnectionOnStartup(true) - .authorizationErrorHandler((responseInfo, context) -> { + .authorizationErrorHandler((responseInfo, requestUri, context) -> { // if there was a retry, the request would succeed. serverSseResponseStatus.set(200); return Mono.just(false); @@ -720,7 +731,7 @@ void emptyHandler() { AtomicReference capturedException = new AtomicReference<>(); var authTransport = HttpClientStreamableHttpTransport.builder(HOST) .openConnectionOnStartup(true) - .authorizationErrorHandler((responseInfo, context) -> Mono.empty()) + .authorizationErrorHandler((responseInfo, requestUri, context) -> Mono.empty()) .build(); authTransport.setExceptionHandler(capturedException::set); @@ -741,8 +752,8 @@ void propagateHandlerError() { AtomicReference capturedException = new AtomicReference<>(); var authTransport = HttpClientStreamableHttpTransport.builder(HOST) .openConnectionOnStartup(true) - .authorizationErrorHandler( - (responseInfo, context) -> Mono.error(new IllegalStateException("handler error"))) + .authorizationErrorHandler((responseInfo, requestUri, context) -> Mono + .error(new IllegalStateException("handler error"))) .build(); authTransport.setExceptionHandler(capturedException::set);