Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.openai.client.okhttp.OpenAIOkHttpClient;
import com.openai.client.okhttp.OpenAIOkHttpClientAsync;
import com.openai.credential.BearerTokenCredential;
import com.openai.credential.Credential;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -208,6 +209,49 @@ public AgentsClientBuilder credential(TokenCredential tokenCredential) {
return this;
}

/*
* Custom header name carrying the API key (e.g. "api-key", "Ocp-Apim-Subscription-Key").
*/
private String apiKeyHeaderName;

/*
* API key value sent on every request via {@link #apiKeyHeaderName}. When set, bearer-token
* authentication is bypassed and callers do not need to supply a {@link TokenCredential}.
*/
private String apiKey;

/**
* Configures API-key authentication using the default {@code api-key} header, which matches
* the Azure OpenAI convention.
*
* @param apiKey the API key value
* @return the {@link AgentsClientBuilder}.
*/
public AgentsClientBuilder apiKey(String apiKey) {
return apiKey("api-key", apiKey);
}

/**
* Configures API-key authentication using a caller-supplied header name. This is used when the
* agents endpoint is fronted by a gateway that requires a custom auth header (for example
* Azure API Management's {@code Ocp-Apim-Subscription-Key}).
*
* <p>When API-key auth is configured, {@link #credential(TokenCredential)} is ignored and the
* OpenAI client receives a placeholder bearer credential — auth is applied by the Azure
* pipeline via an {@link AddHeadersPolicy}.</p>
*
* @param headerName the header name carrying the API key
* @param apiKey the API key value
* @return the {@link AgentsClientBuilder}.
*/
public AgentsClientBuilder apiKey(String headerName, String apiKey) {
Objects.requireNonNull(headerName, "'headerName' cannot be null.");
Objects.requireNonNull(apiKey, "'apiKey' value cannot be null. Ensure the API key is properly configured.");
this.apiKeyHeaderName = headerName;
this.apiKey = apiKey;
return this;
}

/*
* The service endpoint
*/
Expand Down Expand Up @@ -306,7 +350,11 @@ private HttpPipeline createHttpPipeline() {
HttpPolicyProviders.addBeforeRetryPolicies(policies);
policies.add(ClientBuilderUtil.validateAndGetRetryPolicy(retryPolicy, retryOptions, new RetryPolicy()));
policies.add(new AddDatePolicy());
if (tokenCredential != null) {
if (apiKey != null) {
HttpHeaders apiKeyHeaders = new HttpHeaders();
apiKeyHeaders.set(apiKeyHeaderName, apiKey);
policies.add(new AddHeadersPolicy(apiKeyHeaders));
} else if (tokenCredential != null) {
policies.add(new BearerTokenAuthenticationPolicy(tokenCredential, DEFAULT_SCOPES));
}
this.pipelinePolicies.stream()
Expand Down Expand Up @@ -366,25 +414,38 @@ public OpenAIClientAsync buildOpenAIAsyncClient() {
}

private OpenAIOkHttpClient.Builder getOpenAIClientBuilder() {
OpenAIOkHttpClient.Builder builder = OpenAIOkHttpClient.builder()
.credential(
BearerTokenCredential.create(TokenUtils.getBearerTokenSupplier(this.tokenCredential, DEFAULT_SCOPES)));
OpenAIOkHttpClient.Builder builder = OpenAIOkHttpClient.builder().credential(resolveOpenAICredential());
builder.baseUrl(this.endpoint + (this.endpoint.endsWith("/") ? "openai/v1" : "/openai/v1"));
// We set the builder retries to 0 to avoid conflicts with the retry policy added through the HttpPipeline.
builder.maxRetries(0);
return builder;
}

private OpenAIOkHttpClientAsync.Builder getOpenAIAsyncClientBuilder() {
OpenAIOkHttpClientAsync.Builder builder = OpenAIOkHttpClientAsync.builder()
.credential(
BearerTokenCredential.create(TokenUtils.getBearerTokenSupplier(this.tokenCredential, DEFAULT_SCOPES)));
OpenAIOkHttpClientAsync.Builder builder
= OpenAIOkHttpClientAsync.builder().credential(resolveOpenAICredential());
builder.baseUrl(this.endpoint + (this.endpoint.endsWith("/") ? "openai/v1" : "/openai/v1"));
// We set the builder retries to 0 to avoid conflicts with the retry policy added through the HttpPipeline.
builder.maxRetries(0);
return builder;
}

// OpenAI SDK requires a non-null credential at build time. For API-key auth the real
// auth header is injected by the Azure pipeline (AddHeadersPolicy), so we hand the
// OpenAI client a placeholder bearer token that is never used by the server.
private static final String PLACEHOLDER_BEARER_TOKEN = "unused";

private Credential resolveOpenAICredential() {
if (apiKeyHeaderName != null) {
return BearerTokenCredential.create(PLACEHOLDER_BEARER_TOKEN);
}
if (tokenCredential == null) {
throw LOGGER.logExceptionAsError(new IllegalStateException(
"No credential configured: call apiKey() or credential() before building the client."));
}
return BearerTokenCredential.create(TokenUtils.getBearerTokenSupplier(this.tokenCredential, DEFAULT_SCOPES));
}

private static final ClientLogger LOGGER = new ClientLogger(AgentsClientBuilder.class);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@
import com.azure.core.http.HttpHeader;
import com.azure.core.http.HttpHeaders;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LogLevel;
import com.openai.core.http.Headers;
import com.openai.core.http.HttpResponse;
import reactor.core.publisher.Flux;

import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

/**
* Adapter that exposes an Azure {@link com.azure.core.http.HttpResponse} as an OpenAI {@link HttpResponse}. This keeps
Expand Down Expand Up @@ -42,7 +47,43 @@ public Headers headers() {

@Override
public InputStream body() {
return azureResponse.getBodyAsBinaryData().toStream();
// getBodyAsBinaryData().toStream() blocks until the entire Flux<ByteBuffer>
// drains (FluxByteBufferContent.toStream() collects into a byte[] via .block()),
// which breaks SSE progressive delivery. Iterate the Flux lazily so chunks
// reach the SSE parser as they arrive on the wire.
Flux<ByteBuffer> body = azureResponse.getBody();
if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) {
body = tapLines(body);
}
return new FluxByteBufferInputStream(body.toIterable().iterator());
}

/**
* Taps the body Flux to log each complete {@code \n}-terminated line at VERBOSE. A
* {@link StringBuilder} buffers partial lines across chunk boundaries so a line
* split across two {@link ByteBuffer}s is logged once, on arrival of the chunk
* that completes it.
*/
private static Flux<ByteBuffer> tapLines(Flux<ByteBuffer> body) {
StringBuilder lineBuffer = new StringBuilder();
return body.doOnNext(buffer -> {
ByteBuffer view = buffer.duplicate();
byte[] bytes = new byte[view.remaining()];
view.get(bytes);
lineBuffer.append(new String(bytes, StandardCharsets.UTF_8));
int newline;
while ((newline = lineBuffer.indexOf("\n")) >= 0) {
String line = lineBuffer.substring(0, newline);
lineBuffer.delete(0, newline + 1);
if (!line.isEmpty()) {
LOGGER.verbose("SSE line: {}", line);
}
}
}).doOnComplete(() -> {
if (lineBuffer.length() > 0) {
LOGGER.verbose("SSE line (trailing, no newline): {}", lineBuffer);
}
});
}

@Override
Expand All @@ -60,4 +101,65 @@ private static Headers toOpenAiHeaders(HttpHeaders httpHeaders) {
}
return builder.build();
}

/**
* {@link InputStream} that lazily pulls {@link ByteBuffer} chunks from a reactor
* {@code Flux<ByteBuffer>} iterator. Each {@code hasNext()} parks until the next
* {@code onNext} signal, so reads complete as soon as a chunk arrives on the wire
* rather than waiting for the full response to be buffered.
*/
private static final class FluxByteBufferInputStream extends InputStream {

private final Iterator<ByteBuffer> buffers;
private ByteBuffer current;

FluxByteBufferInputStream(Iterator<ByteBuffer> buffers) {
this.buffers = buffers;
}

@Override
public int read() {
ByteBuffer buffer = nextBuffer();
if (buffer == null) {
return -1;
}
return buffer.get() & 0xff;
}

@Override
public int read(byte[] out, int offset, int length) {
// Return after draining the current buffer. Do NOT loop into nextBuffer()
// to top up `length` — that would block on the Flux waiting for the next
// chunk and stall SSE reads until BufferedReader's 8KB request is filled,
// which defeats progressive delivery.
ByteBuffer buffer = nextBuffer();
if (buffer == null) {
return -1;
}
int toTransfer = Math.min(buffer.remaining(), length);
buffer.get(out, offset, toTransfer);
return toTransfer;
}

@Override
public int available() {
ByteBuffer buffer = current;
return buffer == null ? 0 : buffer.remaining();
}

private ByteBuffer nextBuffer() {
if (current != null && current.hasRemaining()) {
return current;
}
while (buffers.hasNext()) {
ByteBuffer candidate = buffers.next();
if (candidate.hasRemaining()) {
current = candidate.duplicate();
return current;
}
}
current = null;
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
import com.openai.errors.UnexpectedStatusCodeException;
import com.openai.errors.UnprocessableEntityException;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.ByteArrayOutputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;

/**
* Utility entry point that adapts an Azure {@link com.azure.core.http.HttpClient} so it can be consumed by
Expand Down Expand Up @@ -87,9 +90,9 @@ public HttpResponse execute(HttpRequest request, RequestOptions requestOptions)
Objects.requireNonNull(requestOptions, "requestOptions");

try {
com.azure.core.http.HttpRequest azureRequest = buildAzureRequest(request);
return new AzureHttpResponseAdapter(
this.httpPipeline.sendSync(azureRequest, buildRequestContext(requestOptions)));
PreparedRequest prepared = buildAzureRequest(request);
return new AzureHttpResponseAdapter(this.httpPipeline.sendSync(prepared.azureRequest,
buildRequestContext(requestOptions, prepared.isStreaming)));
} catch (MalformedURLException exception) {
throw new OpenAIException("Invalid URL in request: " + exception.getMessage(),
LOGGER.logThrowableAsError(exception));
Expand All @@ -106,10 +109,20 @@ public CompletableFuture<HttpResponse> executeAsync(HttpRequest request, Request
Objects.requireNonNull(request, "request");
Objects.requireNonNull(requestOptions, "requestOptions");

// publishOn(boundedElastic) moves signal delivery off the reactor-http-nio
// thread before MonoToCompletableFuture completes the future. The OpenAI SDK
// chains .thenApply callbacks that synchronously parse the response body
// (JSON handler for non-stream, SSE parser for stream); both call
// AzureHttpResponseAdapter.body() which pulls from the underlying
// Flux<ByteBuffer> via BlockingIterable. BlockingIterable rejects iteration
// from non-blocking threads, so without this hop the whole chain fails with
// "Iterating over a toIterable() / toStream() is blocking".
return Mono.fromCallable(() -> buildAzureRequest(request))
.flatMap(azureRequest -> this.httpPipeline.send(azureRequest, buildRequestContext(requestOptions)))
.flatMap(prepared -> this.httpPipeline.send(prepared.azureRequest,
buildRequestContext(requestOptions, prepared.isStreaming)))
.map(response -> (HttpResponse) new AzureHttpResponseAdapter(response))
.onErrorMap(HttpClientWrapper::mapAzureExceptionToOpenAI)
.publishOn(Schedulers.boundedElastic())
.toFuture();
}

Expand Down Expand Up @@ -188,18 +201,50 @@ private static Headers toOpenAIHeaders(HttpHeaders azureHeaders) {
}

/**
* Converts the OpenAI request metadata and body into an Azure {@link com.azure.core.http.HttpRequest}.
* Pattern that matches a top-level {@code "stream": true} JSON field, allowing for
* optional whitespace around the colon. This is used to detect streaming requests
* so the Azure pipeline does not eagerly buffer the SSE response body.
*/
private static com.azure.core.http.HttpRequest buildAzureRequest(HttpRequest request)
throws MalformedURLException {
private static final Pattern STREAM_TRUE_PATTERN = Pattern.compile("\"stream\"\\s*:\\s*true");

/**
* Holds the converted Azure request together with a flag indicating whether the
* original OpenAI request is a streaming request.
*/
static final class PreparedRequest {
final com.azure.core.http.HttpRequest azureRequest;
final boolean isStreaming;

PreparedRequest(com.azure.core.http.HttpRequest azureRequest, boolean isStreaming) {
this.azureRequest = azureRequest;
this.isStreaming = isStreaming;
}
}

/**
* Converts the OpenAI request metadata and body into an Azure {@link com.azure.core.http.HttpRequest}
* and determines whether the request is a streaming request.
*/
private static PreparedRequest buildAzureRequest(HttpRequest request) throws MalformedURLException {
HttpRequestBody requestBody = request.body();
String contentType = requestBody != null ? requestBody.contentType() : null;
BinaryData bodyData = null;
boolean isStreaming = false;

if (requestBody != null && requestBody.contentLength() != 0) {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
requestBody.writeTo(outputStream);
bodyData = BinaryData.fromBytes(outputStream.toByteArray());
byte[] bodyBytes = outputStream.toByteArray();
bodyData = BinaryData.fromBytes(bodyBytes);

// Detect streaming requests by checking for "stream": true in the JSON body.
// When the OpenAI SDK calls createStreaming(), it serializes stream=true into the
// request body. Streaming responses use SSE (Server-Sent Events) and must NOT be
// eagerly buffered by the Azure HTTP pipeline.
if (contentType != null && contentType.contains("json") && bodyBytes.length > 0) {
String bodyStr = new String(bodyBytes, StandardCharsets.UTF_8);
isStreaming = STREAM_TRUE_PATTERN.matcher(bodyStr).find();
}
}

HttpHeaders headers = toAzureHeaders(request.headers());
Expand All @@ -214,7 +259,7 @@ private static com.azure.core.http.HttpRequest buildAzureRequest(HttpRequest req
azureRequest.setBody(bodyData);
}

return azureRequest;
return new PreparedRequest(azureRequest, isStreaming);
}

/**
Expand All @@ -236,11 +281,18 @@ private static HttpHeaders toAzureHeaders(Headers sourceHeaders) {

/**
* Builds the request context from the given request options.
*
* <p>When {@code isStreaming} is {@code true}, the {@code azure-eagerly-read-response}
* context flag is set to {@code false} so the Azure HTTP pipeline returns a live
* streaming body instead of buffering the entire response into memory. This is
* required for SSE (Server-Sent Events) streaming to deliver events incrementally.</p>
*
* @param requestOptions OpenAI SDK request options
* @param isStreaming whether the request is a streaming request
* @return Azure request {@link Context}
*/
private static Context buildRequestContext(RequestOptions requestOptions) {
Context context = new Context("azure-eagerly-read-response", true);
private static Context buildRequestContext(RequestOptions requestOptions, boolean isStreaming) {
Context context = new Context("azure-eagerly-read-response", !isStreaming);
Timeout timeout = requestOptions.getTimeout();
// we use "read" as it's the closest thing to the "response timeout"
if (timeout != null && !timeout.read().isZero() && !timeout.read().isNegative()) {
Expand Down
Loading