-
Notifications
You must be signed in to change notification settings - Fork 27
feat: OpenAI-compatible streaming hardening (prefill heartbeat + OpenCode e2e CI) #75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -661,7 +661,7 @@ struct MLXServer: AsyncParsableCommand { | |||||||||
| do { | ||||||||||
| let bodyData = try await collectBody(request) | ||||||||||
| return try await handleChatCompletion( | ||||||||||
| bodyData: bodyData, config: config, container: container, semaphore: semaphore, stats: stats, promptCache: promptCache, | ||||||||||
| request: request, bodyData: bodyData, config: config, container: container, semaphore: semaphore, stats: stats, promptCache: promptCache, | ||||||||||
| draftModelRef: draftModelRef, numDraftTokens: numDraftTokensConfig | ||||||||||
| ) | ||||||||||
| } catch { | ||||||||||
|
|
@@ -682,7 +682,7 @@ struct MLXServer: AsyncParsableCommand { | |||||||||
| do { | ||||||||||
| let bodyData = try await collectBody(request) | ||||||||||
| return try await handleTextCompletion( | ||||||||||
| bodyData: bodyData, config: config, container: container, semaphore: semaphore, stats: stats | ||||||||||
| request: request, bodyData: bodyData, config: config, container: container, semaphore: semaphore, stats: stats | ||||||||||
| ) | ||||||||||
| } catch { | ||||||||||
| let errMsg = String(describing: error).replacingOccurrences(of: "\"", with: "'") | ||||||||||
|
|
@@ -1020,6 +1020,7 @@ func collectBody(_ request: Request) async throws -> Data { | |||||||||
| // ββ Chat Completions Handler βββββββββββββββββββββββββββββββββββββββββββββββββ | ||||||||||
|
|
||||||||||
| func handleChatCompletion( | ||||||||||
| request: Request, | ||||||||||
| bodyData: Data, | ||||||||||
| config: ServerConfig, | ||||||||||
| container: ModelContainer, | ||||||||||
|
|
@@ -1032,6 +1033,7 @@ func handleChatCompletion( | |||||||||
| let chatReq = try JSONDecoder().decode(ChatCompletionRequest.self, from: bodyData) | ||||||||||
| let isStream = chatReq.stream ?? false | ||||||||||
| let jsonMode = chatReq.responseFormat?.type == "json_object" | ||||||||||
| let emitPrefillProgress = prefillProgressEnabled(in: request) | ||||||||||
|
|
||||||||||
| // ββ Merge per-request overrides with CLI defaults ββ | ||||||||||
| let tokenLimit = chatReq.maxTokens ?? config.maxTokens | ||||||||||
|
|
@@ -1284,7 +1286,8 @@ func handleChatCompletion( | |||||||||
| stream: stream, modelId: modelId, stopSequences: stopSequences, | ||||||||||
| includeUsage: includeUsage, promptTokenCount: promptTokenCount, | ||||||||||
| enableThinking: enableThinking, jsonMode: jsonMode, semaphore: semaphore, | ||||||||||
| stats: stats, genStart: genStart, prefillStart: prefillStart, onPrefillDone: onPrefillDone | ||||||||||
| stats: stats, genStart: genStart, prefillStart: prefillStart, | ||||||||||
| emitPrefillProgress: emitPrefillProgress, onPrefillDone: onPrefillDone | ||||||||||
| ) | ||||||||||
| } else { | ||||||||||
| return try await handleChatNonStreaming( | ||||||||||
|
|
@@ -1365,7 +1368,7 @@ struct ThinkingStateTracker { | |||||||||
| /// Tracks prefill progress: whether it is done, and how many tokens have been processed. | ||||||||||
| /// n_past is updated by activePrefillProgressHook (called from LLMModel.prepare after each chunk) | ||||||||||
| /// and read by the SSE heartbeat task every 2 s. | ||||||||||
| private actor PrefillState { | ||||||||||
| actor PrefillState { | ||||||||||
| private(set) var done: Bool = false | ||||||||||
| private(set) var nPast: Int = 0 | ||||||||||
| func finish() { done = true } | ||||||||||
|
|
@@ -1384,29 +1387,39 @@ func handleChatStreaming( | |||||||||
| stats: ServerStats, | ||||||||||
| genStart: Date, | ||||||||||
| prefillStart: Date, | ||||||||||
| emitPrefillProgress: Bool, | ||||||||||
| onPrefillDone: (() async -> Void)? = nil | ||||||||||
| ) -> Response { | ||||||||||
| let (sseStream, cont) = AsyncStream<String>.makeStream() | ||||||||||
|
|
||||||||||
| // ββ Prefill heartbeat: emit llama-server-style slot_update progress every 2 s ββ | ||||||||||
| // n_past is updated by activePrefillProgressHook in LLMModel.prepare() after each | ||||||||||
| // 512-token chunk; single-chunk prompts only show elapsed_seconds. | ||||||||||
| let prefillState = PrefillState() | ||||||||||
| activePrefillProgressHook = { nPast, _ in | ||||||||||
| Task { await prefillState.update(nPast: nPast) } | ||||||||||
| } | ||||||||||
| Task { | ||||||||||
| var elapsed = 0 | ||||||||||
| while await !prefillState.done { | ||||||||||
| try? await Task.sleep(for: .seconds(2)) | ||||||||||
| if await !prefillState.done { | ||||||||||
| elapsed += 2 | ||||||||||
| let nPast = await prefillState.nPast | ||||||||||
| _ = cont.yield(ssePrefillChunk( | ||||||||||
| modelId: modelId, | ||||||||||
| nPast: nPast, | ||||||||||
| promptTokens: promptTokenCount, | ||||||||||
| elapsedSeconds: elapsed)) | ||||||||||
| // ββ Prefill heartbeat (opt-in via X-SwiftLM-Prefill-Progress: true) ββ | ||||||||||
| // We capture the hook in a local variable so that concurrent requests | ||||||||||
| // cannot clobber each other's hook via the global. The global is still | ||||||||||
| // written here because LLMModel.prepare() reads it, but the semaphore | ||||||||||
| // ensures only one generation runs at a time. | ||||||||||
| var heartbeatTask: Task<Void, Never>? = nil | ||||||||||
| activePrefillProgressHook = nil | ||||||||||
| if emitPrefillProgress { | ||||||||||
| // Hook is scoped to this request: the local prefillState is the only | ||||||||||
| // shared state, and it is actor-isolated. | ||||||||||
| activePrefillProgressHook = { nPast, _ in | ||||||||||
| Task { await prefillState.update(nPast: nPast) } | ||||||||||
| } | ||||||||||
| heartbeatTask = Task { | ||||||||||
| var elapsed = 0 | ||||||||||
| while await !prefillState.done { | ||||||||||
| try? await Task.sleep(for: .seconds(2)) | ||||||||||
| // Guard against Task cancellation on client disconnect. | ||||||||||
| guard !Task.isCancelled else { break } | ||||||||||
| if await !prefillState.done { | ||||||||||
| elapsed += 2 | ||||||||||
| let nPast = await prefillState.nPast | ||||||||||
| _ = cont.yield(ssePrefillChunk( | ||||||||||
| nPast: nPast, | ||||||||||
| promptTokens: promptTokenCount, | ||||||||||
| elapsedSeconds: elapsed)) | ||||||||||
| } | ||||||||||
| } | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
@@ -1436,7 +1449,9 @@ func handleChatStreaming( | |||||||||
| } | ||||||||||
| // Signal first token β stops the prefill heartbeat task | ||||||||||
| if firstToken { | ||||||||||
| // First decode token: stop heartbeat and clear the prefill progress hook | ||||||||||
| // First decode token: cancel heartbeat and clear the prefill progress hook. | ||||||||||
| heartbeatTask?.cancel() | ||||||||||
| heartbeatTask = nil | ||||||||||
| activePrefillProgressHook = nil | ||||||||||
| await prefillState.finish() | ||||||||||
| let prefillDur = Date().timeIntervalSince(prefillStart) | ||||||||||
|
|
@@ -1526,6 +1541,8 @@ func handleChatStreaming( | |||||||||
| toolCallIndex += 1 | ||||||||||
|
|
||||||||||
| case .info(let info): | ||||||||||
| heartbeatTask?.cancel() | ||||||||||
| heartbeatTask = nil | ||||||||||
| activePrefillProgressHook = nil | ||||||||||
| await prefillState.finish() | ||||||||||
|
Comment on lines
+1544
to
1547
|
||||||||||
| heartbeatTask?.cancel() | |
| heartbeatTask = nil | |
| activePrefillProgressHook = nil | |
| await prefillState.finish() |
Copilot
AI
Apr 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same issue as chat streaming: heartbeatTask/activePrefillProgressHook cleanup only happens on first .chunk or .info. If the streaming task is cancelled before those (client disconnect during prefill), the heartbeat can leak and the global hook may stay set. Add unconditional cleanup (e.g., defer/cancellation handler) so resources are always released when the streaming task exits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
activePrefillProgressHookis still a single global, but the server supports--parallelslots (AsyncSemaphore(limit: parallelSlots)). Withparallel > 1, concurrent requests will overwrite/clear each otherβs hook (e.g.,activePrefillProgressHook = nilon line 1402), so prefill updates can be routed to the wrong request or dropped entirely. To actually harden concurrency, scope the hook per-request (e.g., TaskLocal / request-id keyed registry) or otherwise make the prepare() callback non-global; relying on the semaphore only works whenparallel == 1.