From cce7b8882b30ec65248ee548ab5d7edd7c8da76e Mon Sep 17 00:00:00 2001 From: Jegadeesan Muthusamy Date: Wed, 22 Apr 2026 09:59:49 +0530 Subject: [PATCH] Fix for 48726,48868 https://github.com/Azure/azure-sdk-for-java/issues/48726 https://github.com/Azure/azure-sdk-for-java/issues/48868 https://github.com/Azure/azure-sdk-for-java/issues/48726 https://github.com/Azure/azure-sdk-for-java/issues/48868 --- .../azure/ai/agents/AgentsClientBuilder.java | 75 ++++++- .../http/AzureHttpResponseAdapter.java | 104 ++++++++- .../implementation/http/HttpClientHelper.java | 74 +++++- .../http/HttpClientHelperTests.java | 211 ++++++++++++++++++ 4 files changed, 445 insertions(+), 19 deletions(-) diff --git a/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/AgentsClientBuilder.java b/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/AgentsClientBuilder.java index 8d2d19074e02..43826b9359b2 100644 --- a/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/AgentsClientBuilder.java +++ b/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/AgentsClientBuilder.java @@ -41,6 +41,7 @@ import com.openai.client.okhttp.OpenAIOkHttpClient; import com.openai.client.okhttp.OpenAIOkHttpClientAsync; import com.openai.credential.BearerTokenCredential; +import com.openai.credential.Credential; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -208,6 +209,49 @@ public AgentsClientBuilder credential(TokenCredential tokenCredential) { return this; } + /* + * Custom header name carrying the API key (e.g. "api-key", "Ocp-Apim-Subscription-Key"). + */ + private String apiKeyHeaderName; + + /* + * API key value sent on every request via {@link #apiKeyHeaderName}. When set, bearer-token + * authentication is bypassed and callers do not need to supply a {@link TokenCredential}. + */ + private String apiKey; + + /** + * Configures API-key authentication using the default {@code api-key} header, which matches + * the Azure OpenAI convention. + * + * @param apiKey the API key value + * @return the {@link AgentsClientBuilder}. + */ + public AgentsClientBuilder apiKey(String apiKey) { + return apiKey("api-key", apiKey); + } + + /** + * Configures API-key authentication using a caller-supplied header name. This is used when the + * agents endpoint is fronted by a gateway that requires a custom auth header (for example + * Azure API Management's {@code Ocp-Apim-Subscription-Key}). + * + *

When API-key auth is configured, {@link #credential(TokenCredential)} is ignored and the + * OpenAI client receives a placeholder bearer credential — auth is applied by the Azure + * pipeline via an {@link AddHeadersPolicy}.

+ * + * @param headerName the header name carrying the API key + * @param apiKey the API key value + * @return the {@link AgentsClientBuilder}. + */ + public AgentsClientBuilder apiKey(String headerName, String apiKey) { + Objects.requireNonNull(headerName, "'headerName' cannot be null."); + Objects.requireNonNull(apiKey, "'apiKey' value cannot be null. Ensure the API key is properly configured."); + this.apiKeyHeaderName = headerName; + this.apiKey = apiKey; + return this; + } + /* * The service endpoint */ @@ -306,7 +350,11 @@ private HttpPipeline createHttpPipeline() { HttpPolicyProviders.addBeforeRetryPolicies(policies); policies.add(ClientBuilderUtil.validateAndGetRetryPolicy(retryPolicy, retryOptions, new RetryPolicy())); policies.add(new AddDatePolicy()); - if (tokenCredential != null) { + if (apiKey != null) { + HttpHeaders apiKeyHeaders = new HttpHeaders(); + apiKeyHeaders.set(apiKeyHeaderName, apiKey); + policies.add(new AddHeadersPolicy(apiKeyHeaders)); + } else if (tokenCredential != null) { policies.add(new BearerTokenAuthenticationPolicy(tokenCredential, DEFAULT_SCOPES)); } this.pipelinePolicies.stream() @@ -366,9 +414,7 @@ public OpenAIClientAsync buildOpenAIAsyncClient() { } private OpenAIOkHttpClient.Builder getOpenAIClientBuilder() { - OpenAIOkHttpClient.Builder builder = OpenAIOkHttpClient.builder() - .credential( - BearerTokenCredential.create(TokenUtils.getBearerTokenSupplier(this.tokenCredential, DEFAULT_SCOPES))); + OpenAIOkHttpClient.Builder builder = OpenAIOkHttpClient.builder().credential(resolveOpenAICredential()); builder.baseUrl(this.endpoint + (this.endpoint.endsWith("/") ? "openai/v1" : "/openai/v1")); // We set the builder retries to 0 to avoid conflicts with the retry policy added through the HttpPipeline. builder.maxRetries(0); @@ -376,15 +422,30 @@ private OpenAIOkHttpClient.Builder getOpenAIClientBuilder() { } private OpenAIOkHttpClientAsync.Builder getOpenAIAsyncClientBuilder() { - OpenAIOkHttpClientAsync.Builder builder = OpenAIOkHttpClientAsync.builder() - .credential( - BearerTokenCredential.create(TokenUtils.getBearerTokenSupplier(this.tokenCredential, DEFAULT_SCOPES))); + OpenAIOkHttpClientAsync.Builder builder + = OpenAIOkHttpClientAsync.builder().credential(resolveOpenAICredential()); builder.baseUrl(this.endpoint + (this.endpoint.endsWith("/") ? "openai/v1" : "/openai/v1")); // We set the builder retries to 0 to avoid conflicts with the retry policy added through the HttpPipeline. builder.maxRetries(0); return builder; } + // OpenAI SDK requires a non-null credential at build time. For API-key auth the real + // auth header is injected by the Azure pipeline (AddHeadersPolicy), so we hand the + // OpenAI client a placeholder bearer token that is never used by the server. + private static final String PLACEHOLDER_BEARER_TOKEN = "unused"; + + private Credential resolveOpenAICredential() { + if (apiKeyHeaderName != null) { + return BearerTokenCredential.create(PLACEHOLDER_BEARER_TOKEN); + } + if (tokenCredential == null) { + throw LOGGER.logExceptionAsError(new IllegalStateException( + "No credential configured: call apiKey() or credential() before building the client.")); + } + return BearerTokenCredential.create(TokenUtils.getBearerTokenSupplier(this.tokenCredential, DEFAULT_SCOPES)); + } + private static final ClientLogger LOGGER = new ClientLogger(AgentsClientBuilder.class); /** diff --git a/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/AzureHttpResponseAdapter.java b/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/AzureHttpResponseAdapter.java index e299a62a1d48..e7f9991a673b 100644 --- a/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/AzureHttpResponseAdapter.java +++ b/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/AzureHttpResponseAdapter.java @@ -6,10 +6,15 @@ import com.azure.core.http.HttpHeader; import com.azure.core.http.HttpHeaders; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.logging.LogLevel; import com.openai.core.http.Headers; import com.openai.core.http.HttpResponse; +import reactor.core.publisher.Flux; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; /** * Adapter that exposes an Azure {@link com.azure.core.http.HttpResponse} as an OpenAI {@link HttpResponse}. This keeps @@ -42,7 +47,43 @@ public Headers headers() { @Override public InputStream body() { - return azureResponse.getBodyAsBinaryData().toStream(); + // getBodyAsBinaryData().toStream() blocks until the entire Flux + // drains (FluxByteBufferContent.toStream() collects into a byte[] via .block()), + // which breaks SSE progressive delivery. Iterate the Flux lazily so chunks + // reach the SSE parser as they arrive on the wire. + Flux body = azureResponse.getBody(); + if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) { + body = tapLines(body); + } + return new FluxByteBufferInputStream(body.toIterable().iterator()); + } + + /** + * Taps the body Flux to log each complete {@code \n}-terminated line at VERBOSE. A + * {@link StringBuilder} buffers partial lines across chunk boundaries so a line + * split across two {@link ByteBuffer}s is logged once, on arrival of the chunk + * that completes it. + */ + private static Flux tapLines(Flux body) { + StringBuilder lineBuffer = new StringBuilder(); + return body.doOnNext(buffer -> { + ByteBuffer view = buffer.duplicate(); + byte[] bytes = new byte[view.remaining()]; + view.get(bytes); + lineBuffer.append(new String(bytes, StandardCharsets.UTF_8)); + int newline; + while ((newline = lineBuffer.indexOf("\n")) >= 0) { + String line = lineBuffer.substring(0, newline); + lineBuffer.delete(0, newline + 1); + if (!line.isEmpty()) { + LOGGER.verbose("SSE line: {}", line); + } + } + }).doOnComplete(() -> { + if (lineBuffer.length() > 0) { + LOGGER.verbose("SSE line (trailing, no newline): {}", lineBuffer); + } + }); } @Override @@ -60,4 +101,65 @@ private static Headers toOpenAiHeaders(HttpHeaders httpHeaders) { } return builder.build(); } + + /** + * {@link InputStream} that lazily pulls {@link ByteBuffer} chunks from a reactor + * {@code Flux} iterator. Each {@code hasNext()} parks until the next + * {@code onNext} signal, so reads complete as soon as a chunk arrives on the wire + * rather than waiting for the full response to be buffered. + */ + private static final class FluxByteBufferInputStream extends InputStream { + + private final Iterator buffers; + private ByteBuffer current; + + FluxByteBufferInputStream(Iterator buffers) { + this.buffers = buffers; + } + + @Override + public int read() { + ByteBuffer buffer = nextBuffer(); + if (buffer == null) { + return -1; + } + return buffer.get() & 0xff; + } + + @Override + public int read(byte[] out, int offset, int length) { + // Return after draining the current buffer. Do NOT loop into nextBuffer() + // to top up `length` — that would block on the Flux waiting for the next + // chunk and stall SSE reads until BufferedReader's 8KB request is filled, + // which defeats progressive delivery. + ByteBuffer buffer = nextBuffer(); + if (buffer == null) { + return -1; + } + int toTransfer = Math.min(buffer.remaining(), length); + buffer.get(out, offset, toTransfer); + return toTransfer; + } + + @Override + public int available() { + ByteBuffer buffer = current; + return buffer == null ? 0 : buffer.remaining(); + } + + private ByteBuffer nextBuffer() { + if (current != null && current.hasRemaining()) { + return current; + } + while (buffers.hasNext()) { + ByteBuffer candidate = buffers.next(); + if (candidate.hasRemaining()) { + current = candidate.duplicate(); + return current; + } + } + current = null; + return null; + } + } } diff --git a/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/HttpClientHelper.java b/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/HttpClientHelper.java index b0060f6301a8..c6e1fa5d4bf3 100644 --- a/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/HttpClientHelper.java +++ b/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/HttpClientHelper.java @@ -30,14 +30,17 @@ import com.openai.errors.UnexpectedStatusCodeException; import com.openai.errors.UnprocessableEntityException; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import java.io.ByteArrayOutputStream; import java.net.MalformedURLException; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; /** * Utility entry point that adapts an Azure {@link com.azure.core.http.HttpClient} so it can be consumed by @@ -87,9 +90,9 @@ public HttpResponse execute(HttpRequest request, RequestOptions requestOptions) Objects.requireNonNull(requestOptions, "requestOptions"); try { - com.azure.core.http.HttpRequest azureRequest = buildAzureRequest(request); - return new AzureHttpResponseAdapter( - this.httpPipeline.sendSync(azureRequest, buildRequestContext(requestOptions))); + PreparedRequest prepared = buildAzureRequest(request); + return new AzureHttpResponseAdapter(this.httpPipeline.sendSync(prepared.azureRequest, + buildRequestContext(requestOptions, prepared.isStreaming))); } catch (MalformedURLException exception) { throw new OpenAIException("Invalid URL in request: " + exception.getMessage(), LOGGER.logThrowableAsError(exception)); @@ -106,10 +109,20 @@ public CompletableFuture executeAsync(HttpRequest request, Request Objects.requireNonNull(request, "request"); Objects.requireNonNull(requestOptions, "requestOptions"); + // publishOn(boundedElastic) moves signal delivery off the reactor-http-nio + // thread before MonoToCompletableFuture completes the future. The OpenAI SDK + // chains .thenApply callbacks that synchronously parse the response body + // (JSON handler for non-stream, SSE parser for stream); both call + // AzureHttpResponseAdapter.body() which pulls from the underlying + // Flux via BlockingIterable. BlockingIterable rejects iteration + // from non-blocking threads, so without this hop the whole chain fails with + // "Iterating over a toIterable() / toStream() is blocking". return Mono.fromCallable(() -> buildAzureRequest(request)) - .flatMap(azureRequest -> this.httpPipeline.send(azureRequest, buildRequestContext(requestOptions))) + .flatMap(prepared -> this.httpPipeline.send(prepared.azureRequest, + buildRequestContext(requestOptions, prepared.isStreaming))) .map(response -> (HttpResponse) new AzureHttpResponseAdapter(response)) .onErrorMap(HttpClientWrapper::mapAzureExceptionToOpenAI) + .publishOn(Schedulers.boundedElastic()) .toFuture(); } @@ -188,18 +201,50 @@ private static Headers toOpenAIHeaders(HttpHeaders azureHeaders) { } /** - * Converts the OpenAI request metadata and body into an Azure {@link com.azure.core.http.HttpRequest}. + * Pattern that matches a top-level {@code "stream": true} JSON field, allowing for + * optional whitespace around the colon. This is used to detect streaming requests + * so the Azure pipeline does not eagerly buffer the SSE response body. */ - private static com.azure.core.http.HttpRequest buildAzureRequest(HttpRequest request) - throws MalformedURLException { + private static final Pattern STREAM_TRUE_PATTERN = Pattern.compile("\"stream\"\\s*:\\s*true"); + + /** + * Holds the converted Azure request together with a flag indicating whether the + * original OpenAI request is a streaming request. + */ + static final class PreparedRequest { + final com.azure.core.http.HttpRequest azureRequest; + final boolean isStreaming; + + PreparedRequest(com.azure.core.http.HttpRequest azureRequest, boolean isStreaming) { + this.azureRequest = azureRequest; + this.isStreaming = isStreaming; + } + } + + /** + * Converts the OpenAI request metadata and body into an Azure {@link com.azure.core.http.HttpRequest} + * and determines whether the request is a streaming request. + */ + private static PreparedRequest buildAzureRequest(HttpRequest request) throws MalformedURLException { HttpRequestBody requestBody = request.body(); String contentType = requestBody != null ? requestBody.contentType() : null; BinaryData bodyData = null; + boolean isStreaming = false; if (requestBody != null && requestBody.contentLength() != 0) { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); requestBody.writeTo(outputStream); - bodyData = BinaryData.fromBytes(outputStream.toByteArray()); + byte[] bodyBytes = outputStream.toByteArray(); + bodyData = BinaryData.fromBytes(bodyBytes); + + // Detect streaming requests by checking for "stream": true in the JSON body. + // When the OpenAI SDK calls createStreaming(), it serializes stream=true into the + // request body. Streaming responses use SSE (Server-Sent Events) and must NOT be + // eagerly buffered by the Azure HTTP pipeline. + if (contentType != null && contentType.contains("json") && bodyBytes.length > 0) { + String bodyStr = new String(bodyBytes, StandardCharsets.UTF_8); + isStreaming = STREAM_TRUE_PATTERN.matcher(bodyStr).find(); + } } HttpHeaders headers = toAzureHeaders(request.headers()); @@ -214,7 +259,7 @@ private static com.azure.core.http.HttpRequest buildAzureRequest(HttpRequest req azureRequest.setBody(bodyData); } - return azureRequest; + return new PreparedRequest(azureRequest, isStreaming); } /** @@ -236,11 +281,18 @@ private static HttpHeaders toAzureHeaders(Headers sourceHeaders) { /** * Builds the request context from the given request options. + * + *

When {@code isStreaming} is {@code true}, the {@code azure-eagerly-read-response} + * context flag is set to {@code false} so the Azure HTTP pipeline returns a live + * streaming body instead of buffering the entire response into memory. This is + * required for SSE (Server-Sent Events) streaming to deliver events incrementally.

+ * * @param requestOptions OpenAI SDK request options + * @param isStreaming whether the request is a streaming request * @return Azure request {@link Context} */ - private static Context buildRequestContext(RequestOptions requestOptions) { - Context context = new Context("azure-eagerly-read-response", true); + private static Context buildRequestContext(RequestOptions requestOptions, boolean isStreaming) { + Context context = new Context("azure-eagerly-read-response", !isStreaming); Timeout timeout = requestOptions.getTimeout(); // we use "read" as it's the closest thing to the "response timeout" if (timeout != null && !timeout.read().isZero() && !timeout.read().isNegative()) { diff --git a/sdk/ai/azure-ai-agents/src/test/java/com/azure/ai/agents/implementation/http/HttpClientHelperTests.java b/sdk/ai/azure-ai-agents/src/test/java/com/azure/ai/agents/implementation/http/HttpClientHelperTests.java index bf3b9a4bd77b..0a0c45da9650 100644 --- a/sdk/ai/azure-ai-agents/src/test/java/com/azure/ai/agents/implementation/http/HttpClientHelperTests.java +++ b/sdk/ai/azure-ai-agents/src/test/java/com/azure/ai/agents/implementation/http/HttpClientHelperTests.java @@ -26,6 +26,7 @@ import java.util.function.Function; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -160,6 +161,187 @@ void executeAsyncPropagatesHttpClientFailures() { assertEquals("Network error", cause.getMessage()); } + // ======================================================================== + // Streaming detection tests — verifies fix for issue #48726 + // ======================================================================== + + @Test + void nonStreamingRequestSetsEagerlyReadToTrue() { + ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient(); + com.openai.core.http.HttpClient openAiClient + = HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build()); + + // Non-streaming JSON body (no "stream":true) + String body = "{\"model\":\"gpt-4o\",\"input\":\"Hello\"}"; + com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder() + .method(com.openai.core.http.HttpMethod.POST) + .baseUrl("https://example.com") + .addPathSegment("responses") + .body(new TestHttpRequestBody(body, "application/json")) + .build(); + + try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) { + assertEquals(200, response.statusCode()); + } + + Context capturedContext = capturingClient.getLastContext(); + assertNotNull(capturedContext, "Context should have been captured"); + Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null); + assertTrue((Boolean) eagerlyRead, "Non-streaming requests should have azure-eagerly-read-response=true"); + } + + @Test + void streamingRequestSetsEagerlyReadToFalse() { + ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient(); + com.openai.core.http.HttpClient openAiClient + = HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build()); + + // Streaming JSON body (contains "stream":true) + String body = "{\"model\":\"gpt-4o\",\"input\":\"Hello\",\"stream\":true}"; + com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder() + .method(com.openai.core.http.HttpMethod.POST) + .baseUrl("https://example.com") + .addPathSegment("responses") + .body(new TestHttpRequestBody(body, "application/json")) + .build(); + + try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) { + assertEquals(200, response.statusCode()); + } + + Context capturedContext = capturingClient.getLastContext(); + assertNotNull(capturedContext, "Context should have been captured"); + Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null); + assertFalse((Boolean) eagerlyRead, "Streaming requests should have azure-eagerly-read-response=false"); + } + + @Test + void streamingRequestWithWhitespaceSetsEagerlyReadToFalse() { + ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient(); + com.openai.core.http.HttpClient openAiClient + = HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build()); + + // Streaming JSON body with spaces around the colon + String body = "{\"model\":\"gpt-4o\", \"stream\" : true, \"input\":\"Hello\"}"; + com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder() + .method(com.openai.core.http.HttpMethod.POST) + .baseUrl("https://example.com") + .addPathSegment("responses") + .body(new TestHttpRequestBody(body, "application/json")) + .build(); + + try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) { + assertEquals(200, response.statusCode()); + } + + Context capturedContext = capturingClient.getLastContext(); + assertNotNull(capturedContext, "Context should have been captured"); + Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null); + assertFalse((Boolean) eagerlyRead, + "Streaming requests with whitespace around colon should have azure-eagerly-read-response=false"); + } + + @Test + void streamingAsyncRequestSetsEagerlyReadToFalse() { + ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient(); + com.openai.core.http.HttpClient openAiClient + = HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build()); + + // Streaming JSON body + String body = "{\"model\":\"gpt-4o\",\"input\":\"Hello\",\"stream\":true}"; + com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder() + .method(com.openai.core.http.HttpMethod.POST) + .baseUrl("https://example.com") + .addPathSegment("responses") + .body(new TestHttpRequestBody(body, "application/json")) + .build(); + + CompletableFuture future = openAiClient.executeAsync(openAiRequest); + try (com.openai.core.http.HttpResponse response = future.join()) { + assertEquals(200, response.statusCode()); + } + + Context capturedContext = capturingClient.getLastContext(); + assertNotNull(capturedContext, "Context should have been captured"); + Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null); + assertFalse((Boolean) eagerlyRead, "Async streaming requests should have azure-eagerly-read-response=false"); + } + + @Test + void nonJsonBodySetsEagerlyReadToTrue() { + ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient(); + com.openai.core.http.HttpClient openAiClient + = HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build()); + + // Non-JSON body that happens to contain "stream":true text + com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder() + .method(com.openai.core.http.HttpMethod.POST) + .baseUrl("https://example.com") + .body(new TestHttpRequestBody("stream\":true", "text/plain")) + .build(); + + try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) { + assertEquals(200, response.statusCode()); + } + + Context capturedContext = capturingClient.getLastContext(); + assertNotNull(capturedContext, "Context should have been captured"); + Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null); + assertTrue((Boolean) eagerlyRead, "Non-JSON bodies should always have azure-eagerly-read-response=true"); + } + + @Test + void streamFalseInBodySetsEagerlyReadToTrue() { + ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient(); + com.openai.core.http.HttpClient openAiClient + = HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build()); + + // JSON body with "stream":false + String body = "{\"model\":\"gpt-4o\",\"input\":\"Hello\",\"stream\":false}"; + com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder() + .method(com.openai.core.http.HttpMethod.POST) + .baseUrl("https://example.com") + .addPathSegment("responses") + .body(new TestHttpRequestBody(body, "application/json")) + .build(); + + try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) { + assertEquals(200, response.statusCode()); + } + + Context capturedContext = capturingClient.getLastContext(); + assertNotNull(capturedContext, "Context should have been captured"); + Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null); + assertTrue((Boolean) eagerlyRead, "Requests with stream=false should have azure-eagerly-read-response=true"); + } + + @Test + void noBodySetsEagerlyReadToTrue() { + ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient(); + com.openai.core.http.HttpClient openAiClient + = HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build()); + + // GET request with no body + com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder() + .method(com.openai.core.http.HttpMethod.GET) + .baseUrl("https://example.com") + .addPathSegment("test") + .build(); + + try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) { + assertEquals(200, response.statusCode()); + } + + Context capturedContext = capturingClient.getLastContext(); + assertNotNull(capturedContext, "Context should have been captured"); + Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null); + assertTrue((Boolean) eagerlyRead, "Requests without a body should have azure-eagerly-read-response=true"); + } + + // ======================================================================== + // Test helpers + // ======================================================================== + private static com.openai.core.http.HttpRequest createOpenAiRequest() { return com.openai.core.http.HttpRequest.builder() .method(com.openai.core.http.HttpMethod.POST) @@ -219,6 +401,35 @@ int getSendCount() { } } + /** + * HTTP client that captures the Context passed to send(), allowing tests to verify + * context flags like azure-eagerly-read-response. + */ + private static final class ContextCapturingHttpClient implements HttpClient { + private Context lastContext; + + @Override + public Mono send(HttpRequest request) { + return send(request, Context.NONE); + } + + @Override + public Mono send(HttpRequest request, Context context) { + this.lastContext = context; + return Mono.just(createMockResponse(request, 200, new HttpHeaders(), "{}")); + } + + @Override + public HttpResponse sendSync(HttpRequest request, Context context) { + this.lastContext = context; + return createMockResponse(request, 200, new HttpHeaders(), "{}"); + } + + Context getLastContext() { + return lastContext; + } + } + private static final class TestHttpRequestBody implements HttpRequestBody { private final byte[] content; private final String contentType;