diff --git a/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/AgentsClientBuilder.java b/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/AgentsClientBuilder.java
index 8d2d19074e02..43826b9359b2 100644
--- a/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/AgentsClientBuilder.java
+++ b/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/AgentsClientBuilder.java
@@ -41,6 +41,7 @@
import com.openai.client.okhttp.OpenAIOkHttpClient;
import com.openai.client.okhttp.OpenAIOkHttpClientAsync;
import com.openai.credential.BearerTokenCredential;
+import com.openai.credential.Credential;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -208,6 +209,49 @@ public AgentsClientBuilder credential(TokenCredential tokenCredential) {
return this;
}
+ /*
+ * Custom header name carrying the API key (e.g. "api-key", "Ocp-Apim-Subscription-Key").
+ */
+ private String apiKeyHeaderName;
+
+ /*
+ * API key value sent on every request via {@link #apiKeyHeaderName}. When set, bearer-token
+ * authentication is bypassed and callers do not need to supply a {@link TokenCredential}.
+ */
+ private String apiKey;
+
+ /**
+ * Configures API-key authentication using the default {@code api-key} header, which matches
+ * the Azure OpenAI convention.
+ *
+ * @param apiKey the API key value
+ * @return the {@link AgentsClientBuilder}.
+ */
+ public AgentsClientBuilder apiKey(String apiKey) {
+ return apiKey("api-key", apiKey);
+ }
+
+ /**
+ * Configures API-key authentication using a caller-supplied header name. This is used when the
+ * agents endpoint is fronted by a gateway that requires a custom auth header (for example
+ * Azure API Management's {@code Ocp-Apim-Subscription-Key}).
+ *
+ *
When API-key auth is configured, {@link #credential(TokenCredential)} is ignored and the
+ * OpenAI client receives a placeholder bearer credential — auth is applied by the Azure
+ * pipeline via an {@link AddHeadersPolicy}.
+ *
+ * @param headerName the header name carrying the API key
+ * @param apiKey the API key value
+ * @return the {@link AgentsClientBuilder}.
+ */
+ public AgentsClientBuilder apiKey(String headerName, String apiKey) {
+ Objects.requireNonNull(headerName, "'headerName' cannot be null.");
+ Objects.requireNonNull(apiKey, "'apiKey' value cannot be null. Ensure the API key is properly configured.");
+ this.apiKeyHeaderName = headerName;
+ this.apiKey = apiKey;
+ return this;
+ }
+
/*
* The service endpoint
*/
@@ -306,7 +350,11 @@ private HttpPipeline createHttpPipeline() {
HttpPolicyProviders.addBeforeRetryPolicies(policies);
policies.add(ClientBuilderUtil.validateAndGetRetryPolicy(retryPolicy, retryOptions, new RetryPolicy()));
policies.add(new AddDatePolicy());
- if (tokenCredential != null) {
+ if (apiKey != null) {
+ HttpHeaders apiKeyHeaders = new HttpHeaders();
+ apiKeyHeaders.set(apiKeyHeaderName, apiKey);
+ policies.add(new AddHeadersPolicy(apiKeyHeaders));
+ } else if (tokenCredential != null) {
policies.add(new BearerTokenAuthenticationPolicy(tokenCredential, DEFAULT_SCOPES));
}
this.pipelinePolicies.stream()
@@ -366,9 +414,7 @@ public OpenAIClientAsync buildOpenAIAsyncClient() {
}
private OpenAIOkHttpClient.Builder getOpenAIClientBuilder() {
- OpenAIOkHttpClient.Builder builder = OpenAIOkHttpClient.builder()
- .credential(
- BearerTokenCredential.create(TokenUtils.getBearerTokenSupplier(this.tokenCredential, DEFAULT_SCOPES)));
+ OpenAIOkHttpClient.Builder builder = OpenAIOkHttpClient.builder().credential(resolveOpenAICredential());
builder.baseUrl(this.endpoint + (this.endpoint.endsWith("/") ? "openai/v1" : "/openai/v1"));
// We set the builder retries to 0 to avoid conflicts with the retry policy added through the HttpPipeline.
builder.maxRetries(0);
@@ -376,15 +422,30 @@ private OpenAIOkHttpClient.Builder getOpenAIClientBuilder() {
}
private OpenAIOkHttpClientAsync.Builder getOpenAIAsyncClientBuilder() {
- OpenAIOkHttpClientAsync.Builder builder = OpenAIOkHttpClientAsync.builder()
- .credential(
- BearerTokenCredential.create(TokenUtils.getBearerTokenSupplier(this.tokenCredential, DEFAULT_SCOPES)));
+ OpenAIOkHttpClientAsync.Builder builder
+ = OpenAIOkHttpClientAsync.builder().credential(resolveOpenAICredential());
builder.baseUrl(this.endpoint + (this.endpoint.endsWith("/") ? "openai/v1" : "/openai/v1"));
// We set the builder retries to 0 to avoid conflicts with the retry policy added through the HttpPipeline.
builder.maxRetries(0);
return builder;
}
+ // OpenAI SDK requires a non-null credential at build time. For API-key auth the real
+ // auth header is injected by the Azure pipeline (AddHeadersPolicy), so we hand the
+ // OpenAI client a placeholder bearer token that is never used by the server.
+ private static final String PLACEHOLDER_BEARER_TOKEN = "unused";
+
+ private Credential resolveOpenAICredential() {
+ if (apiKeyHeaderName != null) {
+ return BearerTokenCredential.create(PLACEHOLDER_BEARER_TOKEN);
+ }
+ if (tokenCredential == null) {
+ throw LOGGER.logExceptionAsError(new IllegalStateException(
+ "No credential configured: call apiKey() or credential() before building the client."));
+ }
+ return BearerTokenCredential.create(TokenUtils.getBearerTokenSupplier(this.tokenCredential, DEFAULT_SCOPES));
+ }
+
private static final ClientLogger LOGGER = new ClientLogger(AgentsClientBuilder.class);
/**
diff --git a/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/AzureHttpResponseAdapter.java b/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/AzureHttpResponseAdapter.java
index e299a62a1d48..e7f9991a673b 100644
--- a/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/AzureHttpResponseAdapter.java
+++ b/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/AzureHttpResponseAdapter.java
@@ -6,10 +6,15 @@
import com.azure.core.http.HttpHeader;
import com.azure.core.http.HttpHeaders;
import com.azure.core.util.logging.ClientLogger;
+import com.azure.core.util.logging.LogLevel;
import com.openai.core.http.Headers;
import com.openai.core.http.HttpResponse;
+import reactor.core.publisher.Flux;
import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
/**
* Adapter that exposes an Azure {@link com.azure.core.http.HttpResponse} as an OpenAI {@link HttpResponse}. This keeps
@@ -42,7 +47,43 @@ public Headers headers() {
@Override
public InputStream body() {
- return azureResponse.getBodyAsBinaryData().toStream();
+ // getBodyAsBinaryData().toStream() blocks until the entire Flux
+ // drains (FluxByteBufferContent.toStream() collects into a byte[] via .block()),
+ // which breaks SSE progressive delivery. Iterate the Flux lazily so chunks
+ // reach the SSE parser as they arrive on the wire.
+ Flux body = azureResponse.getBody();
+ if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) {
+ body = tapLines(body);
+ }
+ return new FluxByteBufferInputStream(body.toIterable().iterator());
+ }
+
+ /**
+ * Taps the body Flux to log each complete {@code \n}-terminated line at VERBOSE. A
+ * {@link StringBuilder} buffers partial lines across chunk boundaries so a line
+ * split across two {@link ByteBuffer}s is logged once, on arrival of the chunk
+ * that completes it.
+ */
+ private static Flux tapLines(Flux body) {
+ StringBuilder lineBuffer = new StringBuilder();
+ return body.doOnNext(buffer -> {
+ ByteBuffer view = buffer.duplicate();
+ byte[] bytes = new byte[view.remaining()];
+ view.get(bytes);
+ lineBuffer.append(new String(bytes, StandardCharsets.UTF_8));
+ int newline;
+ while ((newline = lineBuffer.indexOf("\n")) >= 0) {
+ String line = lineBuffer.substring(0, newline);
+ lineBuffer.delete(0, newline + 1);
+ if (!line.isEmpty()) {
+ LOGGER.verbose("SSE line: {}", line);
+ }
+ }
+ }).doOnComplete(() -> {
+ if (lineBuffer.length() > 0) {
+ LOGGER.verbose("SSE line (trailing, no newline): {}", lineBuffer);
+ }
+ });
}
@Override
@@ -60,4 +101,65 @@ private static Headers toOpenAiHeaders(HttpHeaders httpHeaders) {
}
return builder.build();
}
+
+ /**
+ * {@link InputStream} that lazily pulls {@link ByteBuffer} chunks from a reactor
+ * {@code Flux} iterator. Each {@code hasNext()} parks until the next
+ * {@code onNext} signal, so reads complete as soon as a chunk arrives on the wire
+ * rather than waiting for the full response to be buffered.
+ */
+ private static final class FluxByteBufferInputStream extends InputStream {
+
+ private final Iterator buffers;
+ private ByteBuffer current;
+
+ FluxByteBufferInputStream(Iterator buffers) {
+ this.buffers = buffers;
+ }
+
+ @Override
+ public int read() {
+ ByteBuffer buffer = nextBuffer();
+ if (buffer == null) {
+ return -1;
+ }
+ return buffer.get() & 0xff;
+ }
+
+ @Override
+ public int read(byte[] out, int offset, int length) {
+ // Return after draining the current buffer. Do NOT loop into nextBuffer()
+ // to top up `length` — that would block on the Flux waiting for the next
+ // chunk and stall SSE reads until BufferedReader's 8KB request is filled,
+ // which defeats progressive delivery.
+ ByteBuffer buffer = nextBuffer();
+ if (buffer == null) {
+ return -1;
+ }
+ int toTransfer = Math.min(buffer.remaining(), length);
+ buffer.get(out, offset, toTransfer);
+ return toTransfer;
+ }
+
+ @Override
+ public int available() {
+ ByteBuffer buffer = current;
+ return buffer == null ? 0 : buffer.remaining();
+ }
+
+ private ByteBuffer nextBuffer() {
+ if (current != null && current.hasRemaining()) {
+ return current;
+ }
+ while (buffers.hasNext()) {
+ ByteBuffer candidate = buffers.next();
+ if (candidate.hasRemaining()) {
+ current = candidate.duplicate();
+ return current;
+ }
+ }
+ current = null;
+ return null;
+ }
+ }
}
diff --git a/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/HttpClientHelper.java b/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/HttpClientHelper.java
index b0060f6301a8..c6e1fa5d4bf3 100644
--- a/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/HttpClientHelper.java
+++ b/sdk/ai/azure-ai-agents/src/main/java/com/azure/ai/agents/implementation/http/HttpClientHelper.java
@@ -30,14 +30,17 @@
import com.openai.errors.UnexpectedStatusCodeException;
import com.openai.errors.UnprocessableEntityException;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
import java.io.ByteArrayOutputStream;
import java.net.MalformedURLException;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
/**
* Utility entry point that adapts an Azure {@link com.azure.core.http.HttpClient} so it can be consumed by
@@ -87,9 +90,9 @@ public HttpResponse execute(HttpRequest request, RequestOptions requestOptions)
Objects.requireNonNull(requestOptions, "requestOptions");
try {
- com.azure.core.http.HttpRequest azureRequest = buildAzureRequest(request);
- return new AzureHttpResponseAdapter(
- this.httpPipeline.sendSync(azureRequest, buildRequestContext(requestOptions)));
+ PreparedRequest prepared = buildAzureRequest(request);
+ return new AzureHttpResponseAdapter(this.httpPipeline.sendSync(prepared.azureRequest,
+ buildRequestContext(requestOptions, prepared.isStreaming)));
} catch (MalformedURLException exception) {
throw new OpenAIException("Invalid URL in request: " + exception.getMessage(),
LOGGER.logThrowableAsError(exception));
@@ -106,10 +109,20 @@ public CompletableFuture executeAsync(HttpRequest request, Request
Objects.requireNonNull(request, "request");
Objects.requireNonNull(requestOptions, "requestOptions");
+ // publishOn(boundedElastic) moves signal delivery off the reactor-http-nio
+ // thread before MonoToCompletableFuture completes the future. The OpenAI SDK
+ // chains .thenApply callbacks that synchronously parse the response body
+ // (JSON handler for non-stream, SSE parser for stream); both call
+ // AzureHttpResponseAdapter.body() which pulls from the underlying
+ // Flux via BlockingIterable. BlockingIterable rejects iteration
+ // from non-blocking threads, so without this hop the whole chain fails with
+ // "Iterating over a toIterable() / toStream() is blocking".
return Mono.fromCallable(() -> buildAzureRequest(request))
- .flatMap(azureRequest -> this.httpPipeline.send(azureRequest, buildRequestContext(requestOptions)))
+ .flatMap(prepared -> this.httpPipeline.send(prepared.azureRequest,
+ buildRequestContext(requestOptions, prepared.isStreaming)))
.map(response -> (HttpResponse) new AzureHttpResponseAdapter(response))
.onErrorMap(HttpClientWrapper::mapAzureExceptionToOpenAI)
+ .publishOn(Schedulers.boundedElastic())
.toFuture();
}
@@ -188,18 +201,50 @@ private static Headers toOpenAIHeaders(HttpHeaders azureHeaders) {
}
/**
- * Converts the OpenAI request metadata and body into an Azure {@link com.azure.core.http.HttpRequest}.
+ * Pattern that matches a top-level {@code "stream": true} JSON field, allowing for
+ * optional whitespace around the colon. This is used to detect streaming requests
+ * so the Azure pipeline does not eagerly buffer the SSE response body.
*/
- private static com.azure.core.http.HttpRequest buildAzureRequest(HttpRequest request)
- throws MalformedURLException {
+ private static final Pattern STREAM_TRUE_PATTERN = Pattern.compile("\"stream\"\\s*:\\s*true");
+
+ /**
+ * Holds the converted Azure request together with a flag indicating whether the
+ * original OpenAI request is a streaming request.
+ */
+ static final class PreparedRequest {
+ final com.azure.core.http.HttpRequest azureRequest;
+ final boolean isStreaming;
+
+ PreparedRequest(com.azure.core.http.HttpRequest azureRequest, boolean isStreaming) {
+ this.azureRequest = azureRequest;
+ this.isStreaming = isStreaming;
+ }
+ }
+
+ /**
+ * Converts the OpenAI request metadata and body into an Azure {@link com.azure.core.http.HttpRequest}
+ * and determines whether the request is a streaming request.
+ */
+ private static PreparedRequest buildAzureRequest(HttpRequest request) throws MalformedURLException {
HttpRequestBody requestBody = request.body();
String contentType = requestBody != null ? requestBody.contentType() : null;
BinaryData bodyData = null;
+ boolean isStreaming = false;
if (requestBody != null && requestBody.contentLength() != 0) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
requestBody.writeTo(outputStream);
- bodyData = BinaryData.fromBytes(outputStream.toByteArray());
+ byte[] bodyBytes = outputStream.toByteArray();
+ bodyData = BinaryData.fromBytes(bodyBytes);
+
+ // Detect streaming requests by checking for "stream": true in the JSON body.
+ // When the OpenAI SDK calls createStreaming(), it serializes stream=true into the
+ // request body. Streaming responses use SSE (Server-Sent Events) and must NOT be
+ // eagerly buffered by the Azure HTTP pipeline.
+ if (contentType != null && contentType.contains("json") && bodyBytes.length > 0) {
+ String bodyStr = new String(bodyBytes, StandardCharsets.UTF_8);
+ isStreaming = STREAM_TRUE_PATTERN.matcher(bodyStr).find();
+ }
}
HttpHeaders headers = toAzureHeaders(request.headers());
@@ -214,7 +259,7 @@ private static com.azure.core.http.HttpRequest buildAzureRequest(HttpRequest req
azureRequest.setBody(bodyData);
}
- return azureRequest;
+ return new PreparedRequest(azureRequest, isStreaming);
}
/**
@@ -236,11 +281,18 @@ private static HttpHeaders toAzureHeaders(Headers sourceHeaders) {
/**
* Builds the request context from the given request options.
+ *
+ * When {@code isStreaming} is {@code true}, the {@code azure-eagerly-read-response}
+ * context flag is set to {@code false} so the Azure HTTP pipeline returns a live
+ * streaming body instead of buffering the entire response into memory. This is
+ * required for SSE (Server-Sent Events) streaming to deliver events incrementally.
+ *
* @param requestOptions OpenAI SDK request options
+ * @param isStreaming whether the request is a streaming request
* @return Azure request {@link Context}
*/
- private static Context buildRequestContext(RequestOptions requestOptions) {
- Context context = new Context("azure-eagerly-read-response", true);
+ private static Context buildRequestContext(RequestOptions requestOptions, boolean isStreaming) {
+ Context context = new Context("azure-eagerly-read-response", !isStreaming);
Timeout timeout = requestOptions.getTimeout();
// we use "read" as it's the closest thing to the "response timeout"
if (timeout != null && !timeout.read().isZero() && !timeout.read().isNegative()) {
diff --git a/sdk/ai/azure-ai-agents/src/test/java/com/azure/ai/agents/implementation/http/HttpClientHelperTests.java b/sdk/ai/azure-ai-agents/src/test/java/com/azure/ai/agents/implementation/http/HttpClientHelperTests.java
index bf3b9a4bd77b..0a0c45da9650 100644
--- a/sdk/ai/azure-ai-agents/src/test/java/com/azure/ai/agents/implementation/http/HttpClientHelperTests.java
+++ b/sdk/ai/azure-ai-agents/src/test/java/com/azure/ai/agents/implementation/http/HttpClientHelperTests.java
@@ -26,6 +26,7 @@
import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -160,6 +161,187 @@ void executeAsyncPropagatesHttpClientFailures() {
assertEquals("Network error", cause.getMessage());
}
+ // ========================================================================
+ // Streaming detection tests — verifies fix for issue #48726
+ // ========================================================================
+
+ @Test
+ void nonStreamingRequestSetsEagerlyReadToTrue() {
+ ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient();
+ com.openai.core.http.HttpClient openAiClient
+ = HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build());
+
+ // Non-streaming JSON body (no "stream":true)
+ String body = "{\"model\":\"gpt-4o\",\"input\":\"Hello\"}";
+ com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder()
+ .method(com.openai.core.http.HttpMethod.POST)
+ .baseUrl("https://example.com")
+ .addPathSegment("responses")
+ .body(new TestHttpRequestBody(body, "application/json"))
+ .build();
+
+ try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) {
+ assertEquals(200, response.statusCode());
+ }
+
+ Context capturedContext = capturingClient.getLastContext();
+ assertNotNull(capturedContext, "Context should have been captured");
+ Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null);
+ assertTrue((Boolean) eagerlyRead, "Non-streaming requests should have azure-eagerly-read-response=true");
+ }
+
+ @Test
+ void streamingRequestSetsEagerlyReadToFalse() {
+ ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient();
+ com.openai.core.http.HttpClient openAiClient
+ = HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build());
+
+ // Streaming JSON body (contains "stream":true)
+ String body = "{\"model\":\"gpt-4o\",\"input\":\"Hello\",\"stream\":true}";
+ com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder()
+ .method(com.openai.core.http.HttpMethod.POST)
+ .baseUrl("https://example.com")
+ .addPathSegment("responses")
+ .body(new TestHttpRequestBody(body, "application/json"))
+ .build();
+
+ try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) {
+ assertEquals(200, response.statusCode());
+ }
+
+ Context capturedContext = capturingClient.getLastContext();
+ assertNotNull(capturedContext, "Context should have been captured");
+ Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null);
+ assertFalse((Boolean) eagerlyRead, "Streaming requests should have azure-eagerly-read-response=false");
+ }
+
+ @Test
+ void streamingRequestWithWhitespaceSetsEagerlyReadToFalse() {
+ ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient();
+ com.openai.core.http.HttpClient openAiClient
+ = HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build());
+
+ // Streaming JSON body with spaces around the colon
+ String body = "{\"model\":\"gpt-4o\", \"stream\" : true, \"input\":\"Hello\"}";
+ com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder()
+ .method(com.openai.core.http.HttpMethod.POST)
+ .baseUrl("https://example.com")
+ .addPathSegment("responses")
+ .body(new TestHttpRequestBody(body, "application/json"))
+ .build();
+
+ try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) {
+ assertEquals(200, response.statusCode());
+ }
+
+ Context capturedContext = capturingClient.getLastContext();
+ assertNotNull(capturedContext, "Context should have been captured");
+ Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null);
+ assertFalse((Boolean) eagerlyRead,
+ "Streaming requests with whitespace around colon should have azure-eagerly-read-response=false");
+ }
+
+ @Test
+ void streamingAsyncRequestSetsEagerlyReadToFalse() {
+ ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient();
+ com.openai.core.http.HttpClient openAiClient
+ = HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build());
+
+ // Streaming JSON body
+ String body = "{\"model\":\"gpt-4o\",\"input\":\"Hello\",\"stream\":true}";
+ com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder()
+ .method(com.openai.core.http.HttpMethod.POST)
+ .baseUrl("https://example.com")
+ .addPathSegment("responses")
+ .body(new TestHttpRequestBody(body, "application/json"))
+ .build();
+
+ CompletableFuture future = openAiClient.executeAsync(openAiRequest);
+ try (com.openai.core.http.HttpResponse response = future.join()) {
+ assertEquals(200, response.statusCode());
+ }
+
+ Context capturedContext = capturingClient.getLastContext();
+ assertNotNull(capturedContext, "Context should have been captured");
+ Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null);
+ assertFalse((Boolean) eagerlyRead, "Async streaming requests should have azure-eagerly-read-response=false");
+ }
+
+ @Test
+ void nonJsonBodySetsEagerlyReadToTrue() {
+ ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient();
+ com.openai.core.http.HttpClient openAiClient
+ = HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build());
+
+ // Non-JSON body that happens to contain "stream":true text
+ com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder()
+ .method(com.openai.core.http.HttpMethod.POST)
+ .baseUrl("https://example.com")
+ .body(new TestHttpRequestBody("stream\":true", "text/plain"))
+ .build();
+
+ try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) {
+ assertEquals(200, response.statusCode());
+ }
+
+ Context capturedContext = capturingClient.getLastContext();
+ assertNotNull(capturedContext, "Context should have been captured");
+ Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null);
+ assertTrue((Boolean) eagerlyRead, "Non-JSON bodies should always have azure-eagerly-read-response=true");
+ }
+
+ @Test
+ void streamFalseInBodySetsEagerlyReadToTrue() {
+ ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient();
+ com.openai.core.http.HttpClient openAiClient
+ = HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build());
+
+ // JSON body with "stream":false
+ String body = "{\"model\":\"gpt-4o\",\"input\":\"Hello\",\"stream\":false}";
+ com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder()
+ .method(com.openai.core.http.HttpMethod.POST)
+ .baseUrl("https://example.com")
+ .addPathSegment("responses")
+ .body(new TestHttpRequestBody(body, "application/json"))
+ .build();
+
+ try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) {
+ assertEquals(200, response.statusCode());
+ }
+
+ Context capturedContext = capturingClient.getLastContext();
+ assertNotNull(capturedContext, "Context should have been captured");
+ Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null);
+ assertTrue((Boolean) eagerlyRead, "Requests with stream=false should have azure-eagerly-read-response=true");
+ }
+
+ @Test
+ void noBodySetsEagerlyReadToTrue() {
+ ContextCapturingHttpClient capturingClient = new ContextCapturingHttpClient();
+ com.openai.core.http.HttpClient openAiClient
+ = HttpClientHelper.mapToOpenAIHttpClient(new HttpPipelineBuilder().httpClient(capturingClient).build());
+
+ // GET request with no body
+ com.openai.core.http.HttpRequest openAiRequest = com.openai.core.http.HttpRequest.builder()
+ .method(com.openai.core.http.HttpMethod.GET)
+ .baseUrl("https://example.com")
+ .addPathSegment("test")
+ .build();
+
+ try (com.openai.core.http.HttpResponse response = openAiClient.execute(openAiRequest)) {
+ assertEquals(200, response.statusCode());
+ }
+
+ Context capturedContext = capturingClient.getLastContext();
+ assertNotNull(capturedContext, "Context should have been captured");
+ Object eagerlyRead = capturedContext.getData("azure-eagerly-read-response").orElse(null);
+ assertTrue((Boolean) eagerlyRead, "Requests without a body should have azure-eagerly-read-response=true");
+ }
+
+ // ========================================================================
+ // Test helpers
+ // ========================================================================
+
private static com.openai.core.http.HttpRequest createOpenAiRequest() {
return com.openai.core.http.HttpRequest.builder()
.method(com.openai.core.http.HttpMethod.POST)
@@ -219,6 +401,35 @@ int getSendCount() {
}
}
+ /**
+ * HTTP client that captures the Context passed to send(), allowing tests to verify
+ * context flags like azure-eagerly-read-response.
+ */
+ private static final class ContextCapturingHttpClient implements HttpClient {
+ private Context lastContext;
+
+ @Override
+ public Mono send(HttpRequest request) {
+ return send(request, Context.NONE);
+ }
+
+ @Override
+ public Mono send(HttpRequest request, Context context) {
+ this.lastContext = context;
+ return Mono.just(createMockResponse(request, 200, new HttpHeaders(), "{}"));
+ }
+
+ @Override
+ public HttpResponse sendSync(HttpRequest request, Context context) {
+ this.lastContext = context;
+ return createMockResponse(request, 200, new HttpHeaders(), "{}");
+ }
+
+ Context getLastContext() {
+ return lastContext;
+ }
+ }
+
private static final class TestHttpRequestBody implements HttpRequestBody {
private final byte[] content;
private final String contentType;