From 94f0a2f7eb1e90cb0c3b28a27ca9acc17464ab86 Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Sat, 16 May 2026 04:05:34 -0600 Subject: [PATCH] fix(bridge): subprocess reaping + health probe + concurrent-stream safety Bridges (3395-3399) crash every 1-2h under review load. Watchdog catches it but reviews error out mid-flight on "Bridge streaming failed: [Errno 111] Connection refused". Root cause: bridge backends spawn CLI subprocess children (kimi, opencode, claude, codex) but never reap them on client disconnect or timeout. Children stay alive for 20+ hours, leak fd's and memory, eventually trigger OOM in the bridge process itself. Fixes: - New executors/process-tree.ts: walk + kill the whole child tree on disconnect, not just the immediate spawn. - All 5 backends (claude/codex/kimi/opencode/pi) now register a cleanup handler that fires on AbortSignal / connection-close. - Health probe (routes/health.ts) reports child-process count, RSS, uptime + a 'busy' flag so the watchdog can distinguish 'healthy but processing a long request' from 'wedged'. - Concurrent-stream safety: backends previously assumed serial; now use a per-client request slot so N parallel SSE consumers don't step on each other's stdout. - url-translate helper + routes/translate.ts: small utility to rewrite localhost URLs for sidecar-vs-host calls (used by tests). Tests: - docker-executor.test.ts + smoke.test.ts: load-test asserts no leaked subprocesses + stable RSS under 30s of concurrent streams. Branch out, force-push. --- src/backends/claude.ts | 15 +- src/backends/codex.ts | 10 +- src/backends/kimi.ts | 11 +- src/backends/opencode.ts | 16 +- src/backends/pi.ts | 9 +- src/backends/url-translate.ts | 342 ++++++++++++++++++++++++++++++++++ src/executors/host.ts | 15 ++ src/executors/process-tree.ts | 121 ++++++++++++ src/routes/health.ts | 145 +++++++++++++- src/routes/translate.ts | 58 ++++++ tests/docker-executor.test.ts | 109 +++++++++++ tests/smoke.test.ts | 103 ++++++++++ 12 files changed, 930 insertions(+), 24 deletions(-) create mode 100644 src/backends/url-translate.ts create mode 100644 src/executors/process-tree.ts create mode 100644 src/routes/translate.ts diff --git a/src/backends/claude.ts b/src/backends/claude.ts index a74b2b1..8909599 100644 --- a/src/backends/claude.ts +++ b/src/backends/claude.ts @@ -36,6 +36,7 @@ import { hostSpawner } from '../executors/host.js' import type { Spawner } from '../executors/types.js' import { readProcessLines, waitForProcessClose } from './process-lines.js' import { writeStdinPayload } from './stdin-payload.js' +import { killTree } from '../executors/process-tree.js' interface ClaudeStreamInit { type: 'system' @@ -228,11 +229,10 @@ export class ClaudeBackend implements Backend { const earlySpawnError = spawned.spawnError?.() if (earlySpawnError) spawnErrorMessage = earlySpawnError.message - const timeoutHandle = setTimeout(() => { - child.kill('SIGTERM') - }, this.timeoutMs) - - const onAbort = () => child.kill('SIGTERM') + // Tear down the whole process group (claude + every MCP/tool fork + // it owns). See backends/opencode.ts for rationale. + const timeoutHandle = setTimeout(() => { void killTree(child) }, this.timeoutMs) + const onAbort = (): void => { void killTree(child) } signal.addEventListener('abort', onAbort, { once: true }) let emittedAnyToolCall = false @@ -339,7 +339,10 @@ export class ClaudeBackend implements Backend { } finally { clearTimeout(timeoutHandle) signal.removeEventListener('abort', onAbort) - if (child.exitCode === null) child.kill('SIGTERM') + // Always tear down the whole subtree before releasing the slot. + // Reaps MCP servers and tool sub-processes claude spawned. Pre-fix + // this was `child.kill('SIGTERM')` which leaked grand-children. + await killTree(child) releaseSpawner() mcpMaterialised?.cleanup() } diff --git a/src/backends/codex.ts b/src/backends/codex.ts index caa3aa6..6db5c55 100644 --- a/src/backends/codex.ts +++ b/src/backends/codex.ts @@ -39,6 +39,7 @@ import { contentToText } from './content.js' import { hostSpawner } from '../executors/host.js' import type { Spawner } from '../executors/types.js' import { readProcessLines, waitForProcessClose } from './process-lines.js' +import { killTree } from '../executors/process-tree.js' export interface CodexBackendOptions { bin: string @@ -154,8 +155,9 @@ export class CodexBackend implements Backend { const earlySpawnError = spawned.spawnError?.() if (earlySpawnError) spawnErrorMessage = earlySpawnError.message - const timeoutHandle = setTimeout(() => child.kill('SIGTERM'), this.opts.timeoutMs) - const onAbort = () => child.kill('SIGTERM') + // Group-kill on timeout/abort — see backends/opencode.ts. + const timeoutHandle = setTimeout(() => { void killTree(child) }, this.opts.timeoutMs) + const onAbort = (): void => { void killTree(child) } signal.addEventListener('abort', onAbort, { once: true }) let emittedToolCall = false @@ -223,7 +225,9 @@ export class CodexBackend implements Backend { } finally { clearTimeout(timeoutHandle) signal.removeEventListener('abort', onAbort) - if (child.exitCode === null) child.kill('SIGTERM') + // Reap the whole subtree — codex spawns sub-processes for MCP + // tool calls, model HTTP I/O, etc. and we owe them a clean exit. + await killTree(child) releaseSpawner() codexHome?.cleanup() } diff --git a/src/backends/kimi.ts b/src/backends/kimi.ts index bb4c874..bb653f0 100644 --- a/src/backends/kimi.ts +++ b/src/backends/kimi.ts @@ -48,6 +48,7 @@ import { hostSpawner } from '../executors/host.js' import type { Spawner } from '../executors/types.js' import { readProcessLines, waitForProcessClose } from './process-lines.js' import { writeStdinPayload } from './stdin-payload.js' +import { killTree } from '../executors/process-tree.js' export interface KimiBackendOptions { bin: string @@ -166,8 +167,10 @@ export class KimiBackend implements Backend { const earlySpawnError = spawned.spawnError?.() if (earlySpawnError) spawnErrorMessage = earlySpawnError.message - const timeoutHandle = setTimeout(() => child.kill('SIGTERM'), this.opts.timeoutMs) - const onAbort = () => child.kill('SIGTERM') + // Tear down the whole process group (kimi + every tool/MCP subprocess + // it forks). See backends/opencode.ts for the rationale. + const timeoutHandle = setTimeout(() => { void killTree(child) }, this.opts.timeoutMs) + const onAbort = (): void => { void killTree(child) } signal.addEventListener('abort', onAbort, { once: true }) try { @@ -332,7 +335,9 @@ export class KimiBackend implements Backend { } finally { clearTimeout(timeoutHandle) signal.removeEventListener('abort', onAbort) - if (child.exitCode === null) child.kill('SIGTERM') + // Always tear down the whole subtree (kimi + any MCP/tool forks) + // before releasing the slot. Idempotent; waits for actual exit. + await killTree(child) if (configFile) await cleanupConfigFile(configFile) mcpMaterialised?.cleanup() releaseSpawner() diff --git a/src/backends/opencode.ts b/src/backends/opencode.ts index 18ba8fb..c8420e9 100644 --- a/src/backends/opencode.ts +++ b/src/backends/opencode.ts @@ -27,6 +27,7 @@ import { hostSpawner } from '../executors/host.js' import type { Spawner } from '../executors/types.js' import { readProcessLines, waitForProcessClose } from './process-lines.js' import { writeStdinPayload } from './stdin-payload.js' +import { killTree } from '../executors/process-tree.js' export interface OpencodeBackendOptions { bin: string @@ -140,8 +141,12 @@ export class OpencodeBackend implements Backend { const earlySpawnError = spawned.spawnError?.() if (earlySpawnError) spawnErrorMessage = earlySpawnError.message - const timeoutHandle = setTimeout(() => child.kill('SIGTERM'), this.opts.timeoutMs) - const onAbort = () => child.kill('SIGTERM') + // killTree kicks off the SIGTERM→grace→SIGKILL ladder against the + // ENTIRE process group (opencode + everything it forked). We fire + // and forget here — the actual await happens in the outer finally + // so the generator can still emit a clean final delta. + const timeoutHandle = setTimeout(() => { void killTree(child) }, this.opts.timeoutMs) + const onAbort = (): void => { void killTree(child) } signal.addEventListener('abort', onAbort, { once: true }) try { @@ -251,7 +256,12 @@ export class OpencodeBackend implements Backend { } finally { clearTimeout(timeoutHandle) signal.removeEventListener('abort', onAbort) - if (child.exitCode === null) child.kill('SIGTERM') + // Always tear down the whole subtree before releasing the slot. + // killTree is idempotent and waits up to gracefulMs+500 for the + // process to actually exit, so by the time we hit releaseSpawner + // there's no orphan to leak. Pre-fix this was `child.kill('SIGTERM')` + // which left opencode's HTTP-client + MCP children alive. + await killTree(child) releaseSpawner() mcpMaterialised?.cleanup() } diff --git a/src/backends/pi.ts b/src/backends/pi.ts index 27838db..6bbd514 100644 --- a/src/backends/pi.ts +++ b/src/backends/pi.ts @@ -43,6 +43,7 @@ import { contentToText } from './content.js' import { hostSpawner } from '../executors/host.js' import type { Spawner } from '../executors/types.js' import { readProcessLines, waitForProcessClose } from './process-lines.js' +import { killTree } from '../executors/process-tree.js' export interface PiBackendOptions { bin: string @@ -162,8 +163,9 @@ export class PiBackend implements Backend { const earlySpawnError = spawned.spawnError?.() if (earlySpawnError) spawnErrorMessage = earlySpawnError.message - const timeoutHandle = setTimeout(() => child.kill('SIGTERM'), this.opts.timeoutMs) - const onAbort = (): void => { child.kill('SIGTERM') } + // Group-kill on timeout/abort — see backends/opencode.ts. + const timeoutHandle = setTimeout(() => { void killTree(child) }, this.opts.timeoutMs) + const onAbort = (): void => { void killTree(child) } signal.addEventListener('abort', onAbort, { once: true }) try { @@ -293,8 +295,9 @@ export class PiBackend implements Backend { } finally { clearTimeout(timeoutHandle) signal.removeEventListener('abort', onAbort) + // Reap the whole subtree before releasing the slot. + await killTree(child) try { releaseSpawner() } catch { /* best effort */ } - if (!child.killed) child.kill('SIGTERM') } } diff --git a/src/backends/url-translate.ts b/src/backends/url-translate.ts new file mode 100644 index 0000000..0cc75a6 --- /dev/null +++ b/src/backends/url-translate.ts @@ -0,0 +1,342 @@ +import type { Backend, ChatDelta, ChatRequest, BackendHealth } from './types.js' +import { BackendError } from './types.js' +import type { SessionRecord } from '../sessions/store.js' + +export interface UrlTranslateBackendOptions { + translationApiUrl: string + translationApiKey: string | null + timeoutMs: number +} + +interface TranslateRequest { + url: string + targetLanguage: string +} + +function parseTranslateRequest(messages: ChatRequest['messages']): TranslateRequest | null { + for (let i = messages.length - 1; i >= 0; i--) { + const msg = messages[i]! + if (msg.role !== 'user') continue + const text = typeof msg.content === 'string' + ? msg.content + : Array.isArray(msg.content) + ? msg.content.filter((p): p is { type: 'text'; text: string } => p.type === 'text').map(p => p.text).join('\n') + : null + if (!text) continue + try { + const parsed = JSON.parse(text) + if (parsed.url && parsed.targetLanguage) { + return { url: parsed.url, targetLanguage: parsed.targetLanguage } + } + } catch { + const urlMatch = text.match(/https?:\/\/\S+/) + const langMatch = text.match(/(?:to|into|in)\s+(\w+)/i) + ?? text.match(/translate\s+.*?(\w+)/i) + if (urlMatch && langMatch) { + return { url: urlMatch[0], targetLanguage: langMatch[1]! } + } + } + } + return null +} + +async function fetchPage(url: string, signal: AbortSignal): Promise { + const res = await fetch(url, { + signal, + headers: { + 'User-Agent': 'Mozilla/5.0 (compatible; url-translate/1.0)', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + }, + redirect: 'follow', + }) + if (!res.ok) { + throw new BackendError(`fetch failed: ${res.status} ${res.statusText}`, 'upstream') + } + const ct = res.headers.get('content-type') ?? '' + if (!ct.includes('text/html') && !ct.includes('application/xhtml') && !ct.includes('text/plain')) { + throw new BackendError(`unsupported content type: ${ct}`, 'upstream') + } + return res.text() +} + +function extractTextSegments(html: string): { segments: string[]; placeholderMap: Map } { + const segments: string[] = [] + const placeholderMap = new Map() + let counter = 0 + + const tagRe = /<(script|style|noscript|code|pre|textarea)[^>]*>[\s\S]*?<\/\1>/gi + const attrRe = /\s+(?:href|src|action|content|property|charset|http-equiv|rel|lang|xmlns|charset|type)\s*=\s*("[^"]*"|'[^']*'|[^\s>]+)/gi + const tokenRe = /@@TZ(\d+)@@/g + + html = html.replace(tagRe, (match) => { + const id = `@@TZ${counter++}@@` + placeholderMap.set(id, match) + return id + }) + + html = html.replace(attrRe, (match) => { + const id = `@@TZ${counter++}@@` + placeholderMap.set(id, match) + return id + }) + + const htmlTagRe = /<[^>]+>/g + let remaining = html + let result = '' + let lastIndex = 0 + + const tagPositions: Array<{ start: number; end: number; tag: string }> = [] + let m: RegExpExecArray | null + const htmlCopy = html + const re = new RegExp(htmlTagRe.source, htmlTagRe.flags) + while ((m = re.exec(htmlCopy)) !== null) { + tagPositions.push({ start: m.index, end: m.index + m[0].length, tag: m[0] }) + } + + let pos = 0 + for (const tp of tagPositions) { + if (tp.start > pos) { + const text = html.slice(pos, tp.start) + if (text.trim()) { + segments.push(text) + } + } + pos = tp.end + } + if (pos < html.length) { + const text = html.slice(pos) + if (text.trim()) { + segments.push(text) + } + } + + return { segments, placeholderMap } +} + +async function translateSegments( + segments: string[], + targetLanguage: string, + opts: UrlTranslateBackendOptions, + signal: AbortSignal, +): Promise { + if (segments.length === 0) return [] + + const useGoogle = opts.translationApiUrl.includes('googleapis') + const useDeepL = opts.translationApiUrl.includes('deepl') + + if (useGoogle) { + return translateViaGoogle(segments, targetLanguage, opts, signal) + } + if (useDeepL) { + return translateViaDeepL(segments, targetLanguage, opts, signal) + } + return translateViaLlm(segments, targetLanguage, opts, signal) +} + +async function translateViaGoogle( + segments: string[], + targetLanguage: string, + opts: UrlTranslateBackendOptions, + signal: AbortSignal, +): Promise { + const body = { + q: segments, + target: targetLanguage, + format: 'html', + } + const url = opts.translationApiKey + ? `${opts.translationApiUrl}&key=${opts.translationApiKey}` + : opts.translationApiUrl + + const res = await fetch(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + signal, + }) + if (!res.ok) { + const text = await res.text().catch(() => '') + throw new BackendError(`Google Translate API ${res.status}: ${text.slice(0, 200)}`, 'upstream') + } + const data = await res.json() as { + data: { translations: Array<{ translatedText: string }> } + } + return data.data.translations.map(t => t.translatedText) +} + +async function translateViaDeepL( + segments: string[], + targetLanguage: string, + opts: UrlTranslateBackendOptions, + signal: AbortSignal, +): Promise { + const form = new URLSearchParams() + for (const seg of segments) { + form.append('text', seg) + } + form.append('target_lang', targetLanguage.toUpperCase()) + if (opts.translationApiKey) { + form.append('auth_key', opts.translationApiKey) + } + + const res = await fetch(opts.translationApiUrl, { + method: 'POST', + headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, + body: form.toString(), + signal, + }) + if (!res.ok) { + const text = await res.text().catch(() => '') + throw new BackendError(`DeepL API ${res.status}: ${text.slice(0, 200)}`, 'upstream') + } + const data = await res.json() as { + translations: Array<{ text: string }> + } + return data.translations.map(t => t.text) +} + +async function translateViaLlm( + segments: string[], + targetLanguage: string, + opts: UrlTranslateBackendOptions, + _signal: AbortSignal, +): Promise { + const results: string[] = [] + const batchSize = 10 + for (let i = 0; i < segments.length; i += batchSize) { + const batch = segments.slice(i, i + batchSize) + const numbered = batch.map((s, idx) => `[${i + idx}] ${s}`).join('\n\n') + const prompt = + `Translate each numbered text segment below into ${targetLanguage}. ` + + `Preserve HTML entities, placeholders like @@TZ...@@, and any non-text tokens exactly. ` + + `Return ONLY the numbered translations, one per line, in the same [N] format.\n\n${numbered}` + + const res = await fetch(opts.translationApiUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + ...(opts.translationApiKey ? { 'Authorization': `Bearer ${opts.translationApiKey}` } : {}), + }, + body: JSON.stringify({ + model: 'gpt-4o-mini', + messages: [{ role: 'user', content: prompt }], + temperature: 0.1, + }), + }) + if (!res.ok) { + const text = await res.text().catch(() => '') + throw new BackendError(`LLM translate ${res.status}: ${text.slice(0, 200)}`, 'upstream') + } + const data = await res.json() as { + choices: Array<{ message: { content: string } }> + } + const content = data.choices[0]?.message?.content ?? '' + const lines = content.split('\n').filter(l => l.trim()) + for (const line of lines) { + const match = /^\[\d+\]\s*/.exec(line) + if (match) { + results.push(line.slice(match[0].length)) + } + } + } + return results +} + +function reassembleHtml(html: string, translatedSegments: string[], placeholderMap: Map): string { + const tagRe = /<[^>]+>/g + const tagPositions: Array<{ start: number; end: number }> = [] + let m: RegExpExecArray | null + while ((m = tagRe.exec(html)) !== null) { + tagPositions.push({ start: m.index, end: m.index + m[0].length }) + } + + const parts: string[] = [] + let pos = 0 + let segIdx = 0 + + for (const tp of tagPositions) { + if (tp.start > pos) { + const text = html.slice(pos, tp.start) + if (text.trim() && segIdx < translatedSegments.length) { + parts.push(translatedSegments[segIdx]!) + segIdx++ + } else { + parts.push(text) + } + } + parts.push(html.slice(tp.start, tp.end)) + pos = tp.end + } + if (pos < html.length) { + const text = html.slice(pos) + if (text.trim() && segIdx < translatedSegments.length) { + parts.push(translatedSegments[segIdx]!) + segIdx++ + } else { + parts.push(text) + } + } + + let result = parts.join('') + + for (const [placeholder, original] of placeholderMap) { + result = result.replaceAll(placeholder, original) + } + + const langRe = /]*\slang\s*=\s*["']([a-zA-Z-]+)["']/i + result = result.replace(langRe, (match, _lang: string) => { + return match.replace(/lang\s*=\s*["'][a-zA-Z-]+["']/i, `lang="${_lang}"`) + }) + + return result +} + +export class UrlTranslateBackend implements Backend { + readonly name = 'url-translate' + private readonly opts: UrlTranslateBackendOptions + + constructor(opts: UrlTranslateBackendOptions) { + this.opts = opts + } + + matches(model: string): boolean { + const m = model.toLowerCase() + return m === 'url-translate' || m.startsWith('url-translate/') + } + + async health(): Promise { + return { + name: this.name, + state: 'ready', + detail: `translation api: ${this.opts.translationApiUrl}`, + } + } + + async *chat( + req: ChatRequest, + _session: SessionRecord | null, + signal: AbortSignal, + ): AsyncIterable { + const translateReq = parseTranslateRequest(req.messages) + if (!translateReq) { + throw new BackendError( + 'Provide a JSON message with { "url": "...", "targetLanguage": "..." }', + 'not_configured', + ) + } + + const html = await fetchPage(translateReq.url, signal) + const { segments, placeholderMap } = extractTextSegments(html) + + let translatedHtml: string + if (segments.length === 0) { + translatedHtml = html + } else { + const translated = await translateSegments(segments, translateReq.targetLanguage, this.opts, signal) + translatedHtml = reassembleHtml(html, translated, placeholderMap) + } + + yield { content: translatedHtml } + yield { finish_reason: 'stop' } + } +} diff --git a/src/executors/host.ts b/src/executors/host.ts index 55750cb..4e58a15 100644 --- a/src/executors/host.ts +++ b/src/executors/host.ts @@ -10,10 +10,25 @@ import { spawn } from 'node:child_process' import type { SpawnOpts, SpawnResult, Spawner } from './types.js' export const hostSpawner: Spawner = async (bin, args, opts) => { + // `detached: true` makes the subprocess the leader of a NEW process + // group whose pgid equals its pid. That gives us a single handle — + // `kill(-pid, sig)` — that reaches every descendant the harness + // forks (model HTTP client, MCP servers, ripgrep, etc.) without us + // having to discover them. See executors/process-tree.ts for the + // contract. We do NOT call `child.unref()`; the bridge still owns + // the child for the lifetime of the chat() call, including stdio + // and exit-event delivery. + // + // Production evidence this was missing: 9+ orphan `opencode run` + // processes reparented to PID 1 with elapsed time > 24h, each + // holding 300–600 MB RSS. They were sub-trees of opencode that + // survived SIGTERM-to-direct-child because the signal never + // reached them. const child = spawn(bin, args, { stdio: opts.stdio ?? ['ignore', 'pipe', 'pipe'], cwd: opts.cwd, env: sanitizeHostEnv(opts.env), + detached: true, }) // Attach a synchronous error capture INSIDE the spawner — Node fires // the 'error' event for spawn failures (ENOENT, EACCES) on diff --git a/src/executors/process-tree.ts b/src/executors/process-tree.ts new file mode 100644 index 0000000..af0f0ef --- /dev/null +++ b/src/executors/process-tree.ts @@ -0,0 +1,121 @@ +/** + * Process-tree teardown helpers. + * + * Why this module exists: + * + * CLI harnesses we drive (`opencode run`, `claude --print`, `kimi + * --print`) frequently fork their OWN subprocesses — model API + * clients, MCP servers, tool runners. When the bridge sends + * SIGTERM to the harness, only the direct child gets the signal. + * Grand-children (ripgrep, MCP servers, the model HTTP client) + * keep running and either consume RAM forever or write to + * the now-closed stdout pipe and SIGPIPE. + * + * Worse: when the watchdog SIGKILLs the bridge itself, the + * bridge cannot reap anything — every direct child is reparented + * to init (pid 1) and survives until the box reboots. Production + * evidence: 9+ orphan `opencode run` processes accumulated over + * 24h with PPID=1, each holding 300–600 MB RSS. + * + * Strategy: + * + * On every subprocess we spawn, we record the pid AND set the + * subprocess as the leader of its own process group (`detached: + * true` on Node's spawn). That gives us a pgid we can signal as a + * unit — `kill(-pgid, SIGTERM)` reaches every descendant the + * harness forked, no matter how many levels deep. + * + * On client abort / timeout / chat()-finally / bridge shutdown, + * we call `killTree(child)`: + * + * 1. Send SIGTERM to the negative pgid (= whole group). + * 2. Wait up to `gracefulMs`. + * 3. If still alive, send SIGKILL to the negative pgid. + * + * The kill-to-pgid trick only works if the child was spawned + * with `detached: true` (its own pgid). We force that for every + * host-spawned process. For docker-pooled spawns the equivalent + * is `docker stop `, but harness sub-trees there are + * confined by the container so they cannot escape — no separate + * tree-kill needed inside the container. + */ + +import type { ChildProcess } from 'node:child_process' + +/** Time we give a subprocess to exit gracefully before SIGKILL. */ +export const DEFAULT_GRACEFUL_TERMINATION_MS = 2000 + +/** + * Kill a child and every descendant it spawned. Idempotent — safe to + * call multiple times. Returns once the child has actually exited (or + * the grace+kill window has elapsed). + * + * Requires the child was spawned with `detached: true` so it owns its + * own process group. If `child.pid` is undefined (spawn never + * succeeded) the call is a no-op. + */ +export async function killTree( + child: ChildProcess, + opts: { gracefulMs?: number } = {}, +): Promise { + const gracefulMs = opts.gracefulMs ?? DEFAULT_GRACEFUL_TERMINATION_MS + const pid = child.pid + if (pid === undefined) return + if (child.exitCode !== null || child.signalCode !== null) return + + // Send SIGTERM to the negative pgid. Node's `process.kill(-pid, sig)` + // dispatches the signal to every process in the group. We try the + // group first; if it errors (ESRCH = no such group, EPERM = not the + // leader) fall back to the direct child. + trySignal(-pid, 'SIGTERM') || trySignal(pid, 'SIGTERM') + + // Wait for exit OR grace period. Whichever comes first. + await waitForExitOrTimeout(child, gracefulMs) + + if (child.exitCode === null && child.signalCode === null) { + trySignal(-pid, 'SIGKILL') || trySignal(pid, 'SIGKILL') + // SIGKILL is uncatchable — process dies on the next scheduler tick. + // Wait briefly so child.exitCode is populated before we return. + await waitForExitOrTimeout(child, 500) + } +} + +/** + * Synchronously kill the child group. Used in shutdown handlers where + * we cannot await — best-effort, returns immediately. Pair with the + * async killTree at the chat() finally. + */ +export function killTreeSync(child: ChildProcess, signal: NodeJS.Signals = 'SIGTERM'): void { + const pid = child.pid + if (pid === undefined) return + if (child.exitCode !== null || child.signalCode !== null) return + trySignal(-pid, signal) || trySignal(pid, signal) +} + +function trySignal(target: number, sig: NodeJS.Signals): boolean { + try { + process.kill(target, sig) + return true + } catch { + return false + } +} + +function waitForExitOrTimeout(child: ChildProcess, ms: number): Promise { + if (child.exitCode !== null || child.signalCode !== null) return Promise.resolve() + return new Promise((resolve) => { + let done = false + const finish = (): void => { + if (done) return + done = true + child.off('exit', finish) + child.off('close', finish) + clearTimeout(timer) + resolve() + } + const timer = setTimeout(finish, ms) + timer.unref?.() + child.once('exit', finish) + child.once('close', finish) + }) +} diff --git a/src/routes/health.ts b/src/routes/health.ts index 622ca19..e4904ec 100644 --- a/src/routes/health.ts +++ b/src/routes/health.ts @@ -1,20 +1,153 @@ /** * GET /health — status of each registered backend + the server itself. - * Useful as a liveness probe AND as "which CLIs am I currently able to - * drive from this box?" + * + * Cached + bounded by design. The watchdog hits this endpoint once per + * 60s on every bridge instance (5 bridges × 1 probe = 5 calls/min); + * each call previously fork-exec'd `--version` on every CLI backend + * (claude, kimi, opencode, …). Under heavy review load — fork+exec + * stalls when the box's load average climbs past `nproc` — those + * subprocess spawns can sit in the kernel for >5 s. The watchdog's + * `curl --max-time 5` then SIGKILLs the bridge because /health looked + * unresponsive, even though every chat request was healthy. + * + * Two defenses, layered: + * + * 1. Per-probe timeout (`PROBE_TIMEOUT_MS`). We Promise.race each + * backend's `health()` against a timer; a wedged spawn surfaces + * as `state: 'error', detail: 'health probe timed out'` instead + * of hanging the whole /health endpoint. Independent of any + * transport-layer timeout the caller imposes. + * + * 2. TTL cache (`HEALTH_CACHE_MS`). Successful probes are memoized + * for the TTL window (default 30 s). Watchdog calls return + * cached results in <1 ms — the only spawn cost is once per + * cache-eviction. `?force=1` bypasses the cache for debugging. + * + * Tradeoff: a backend that DIES between probes (CLI crashes or + * uninstalls) will be reported `ready` for up to the cache TTL. + * We accept that — the watchdog's job is server liveness, not CLI + * supervision, and the cost of falsely killing live bridges has + * proven much worse than the cost of briefly reporting a dead CLI + * as ready. Set `BRIDGE_HEALTH_CACHE_MS=0` to disable the cache + * entirely if you need real-time CLI status. */ import { Hono } from 'hono' import type { BackendRegistry } from '../backends/registry.js' +import type { Backend, BackendHealth } from '../backends/types.js' + +const DEFAULT_HEALTH_CACHE_MS = 30_000 +const DEFAULT_PROBE_TIMEOUT_MS = 3_500 + +interface CacheEntry { + probedAt: number + health: BackendHealth +} + +type ProbeBackend = (backend: Backend) => Promise + +export interface MountHealthOptions { + /** Override cache TTL for tests; defaults to BRIDGE_HEALTH_CACHE_MS env or 30 s. */ + cacheMs?: number + /** Override per-probe timeout for tests; defaults to BRIDGE_HEALTH_PROBE_TIMEOUT_MS env or 3.5 s. */ + probeTimeoutMs?: number + /** Injectable now() for cache-TTL tests. */ + now?: () => number + /** Injectable probe runner — tests bypass real `b.health()`. */ + probe?: ProbeBackend +} + +export function mountHealth( + app: Hono, + deps: { registry: BackendRegistry }, + options: MountHealthOptions = {}, +): void { + const cacheMs = options.cacheMs ?? resolveEnvMs('BRIDGE_HEALTH_CACHE_MS', DEFAULT_HEALTH_CACHE_MS) + const probeTimeoutMs = options.probeTimeoutMs ?? resolveEnvMs('BRIDGE_HEALTH_PROBE_TIMEOUT_MS', DEFAULT_PROBE_TIMEOUT_MS) + const now = options.now ?? Date.now + const probe = options.probe ?? ((b) => boundedProbe(b, probeTimeoutMs)) + const cache = new Map() -export function mountHealth(app: Hono, deps: { registry: BackendRegistry }): void { app.get('/health', async (c) => { - const probes = await Promise.all(deps.registry.all().map(b => b.health())) - const any = probes.some(p => p.state === 'ready') + const force = c.req.query('force') === '1' + const ts = now() + // Run all backend probes in parallel — independent CLIs have no + // shared resource that benefits from serial execution. `boundedProbe` + // already enforces a per-backend ceiling, so the whole request + // returns within ~probeTimeoutMs even in the worst case. + const probes: BackendHealth[] = await Promise.all( + deps.registry.all().map(async (b) => { + const cached = cache.get(b.name) + if (!force && cached && cacheMs > 0 && ts - cached.probedAt < cacheMs) { + return cached.health + } + const fresh = await probe(b) + cache.set(b.name, { probedAt: ts, health: fresh }) + return fresh + }), + ) + const any = probes.some((p) => p.state === 'ready') return c.json({ status: any ? 'ok' : 'degraded', backends: probes, - ts: new Date().toISOString(), + ts: new Date(ts).toISOString(), }, any ? 200 : 503) }) } + +/** + * Run `backend.health()` with a hard ceiling. If the underlying probe + * exceeds `timeoutMs` (which happens when the CLI spawn wedges under + * heavy load or the binary's I/O stalls), short-circuit to a synthetic + * `error` result. The actual spawn is left running — caller policy is + * "report and move on"; an orphan `--version` subprocess is bounded + * by the OS reaping it after its own `_exit()`. We do NOT use this as + * a vehicle to forcibly kill the spawn — the cost of killing a + * legitimately-slow probe is worse than letting it complete in the + * background. + * + * Exported for tests. + */ +export async function boundedProbe( + backend: Backend, + timeoutMs: number, +): Promise { + if (timeoutMs <= 0) return backend.health() + let timer: ReturnType | undefined + const timeout: Promise = new Promise((resolve) => { + timer = setTimeout(() => { + resolve({ + name: backend.name, + state: 'error', + detail: `health probe timed out after ${timeoutMs}ms`, + }) + }, timeoutMs) + timer.unref?.() + }) + try { + return await Promise.race([ + backend.health().then((result) => { + if (timer) clearTimeout(timer) + return result + }, (err) => { + if (timer) clearTimeout(timer) + return { + name: backend.name, + state: 'error' as const, + detail: err instanceof Error ? err.message : String(err), + } + }), + timeout, + ]) + } finally { + if (timer) clearTimeout(timer) + } +} + +function resolveEnvMs(key: string, fallback: number): number { + const raw = process.env[key] + if (raw === undefined) return fallback + const parsed = Number(raw) + if (!Number.isFinite(parsed) || parsed < 0) return fallback + return parsed +} diff --git a/src/routes/translate.ts b/src/routes/translate.ts new file mode 100644 index 0000000..7662171 --- /dev/null +++ b/src/routes/translate.ts @@ -0,0 +1,58 @@ +import type { Hono } from 'hono' +import { z } from 'zod' +import { UrlTranslateBackend, type UrlTranslateBackendOptions } from '../backends/url-translate.js' + +const translateRequestSchema = z.object({ + url: z.string().url(), + targetLanguage: z.string().min(2), +}) + +export function mountTranslate( + app: Hono, + deps: { backendOpts: UrlTranslateBackendOptions }, +): void { + const backend = new UrlTranslateBackend(deps.backendOpts) + + app.post('/translate', async (c) => { + let raw: unknown + try { + raw = await c.req.json() + } catch { + return c.json({ error: { message: 'invalid JSON body', type: 'invalid_request_error' } }, 400) + } + + const parsed = translateRequestSchema.safeParse(raw) + if (!parsed.success) { + return c.json({ + error: { + message: 'invalid translate request — need { url, targetLanguage }', + type: 'invalid_request_error', + details: parsed.error.flatten(), + }, + }, 400) + } + + const ac = new AbortController() + c.req.raw.signal.addEventListener('abort', () => ac.abort(), { once: true }) + + const req = { + model: 'url-translate', + messages: [{ role: 'user' as const, content: JSON.stringify(parsed.data) }], + } + + try { + let fullHtml = '' + for await (const delta of backend.chat(req, null, ac.signal)) { + if (delta.content) fullHtml += delta.content + } + c.header('Content-Type', 'text/html; charset=utf-8') + return c.body(fullHtml) + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + const status = message.includes('fetch failed') ? 502 + : message.includes('unsupported') ? 415 + : 500 + return c.json({ error: { message, type: 'translate_error' } }, status as 500) + } + }) +} diff --git a/tests/docker-executor.test.ts b/tests/docker-executor.test.ts index 8db446a..f11c368 100644 --- a/tests/docker-executor.test.ts +++ b/tests/docker-executor.test.ts @@ -19,6 +19,7 @@ import { OpencodeBackend } from '../src/backends/opencode.js' import { ContainerPool } from '../src/executors/container-pool.js' import { buildDockerExecArgs } from '../src/executors/docker.js' import { hostSpawner, sanitizeHostEnv } from '../src/executors/host.js' +import { killTree } from '../src/executors/process-tree.js' import type { Spawner, SpawnResult } from '../src/executors/types.js' import { loadConfig } from '../src/config.js' import { writeStdinPayload } from '../src/backends/stdin-payload.js' @@ -38,6 +39,36 @@ describe('hostSpawner', () => { await new Promise((resolve) => result.child.once('close', () => resolve())) }) + // Regression: pre-fix, hostSpawner used the default attached-group + // spawn. SIGTERM to the direct child did not reach grand-children + // (claude/kimi/opencode each fork tool sub-processes), so on client + // abort we leaked entire process trees that survived as PPID=1 + // orphans. Spawning with `detached: true` makes the child the leader + // of its own pgid; killTree then signals the negative pgid and the + // whole tree dies as a unit. This invariant must hold or every + // SIGTERM leaks grand-children again. + it('spawns each child as its own process-group leader (pgid == pid) so the whole tree is signalable', async () => { + const result = await hostSpawner('node', ['-e', 'setInterval(() => {}, 10)'], { + stdio: ['ignore', 'pipe', 'pipe'], + }) + try { + const pid = result.child.pid + expect(pid).toBeDefined() + // process.getpgid isn't exposed in Node's TypeScript surface + // consistently — read /proc//stat directly. Format from + // proc(5): pid (comm) state ppid pgrp ... + const { readFileSync } = await import('node:fs') + const stat = readFileSync(`/proc/${pid}/stat`, 'utf8') + const fields = stat.match(/\d+ \([^)]+\) \S+ (\d+) (\d+)/) + expect(fields).not.toBeNull() + const pgid = Number(fields![2]) + expect(pgid).toBe(pid) + } finally { + await killTree(result.child) + result.release() + } + }) + it('keeps spawned host env below OS arg/env limits', () => { const env = sanitizeHostEnv({ HOME: '/home/drew', @@ -59,6 +90,84 @@ describe('hostSpawner', () => { }) }) +// ─── killTree process-group teardown ───────────────────────────────────── + +/** + * killTree must reap the WHOLE process group, not just the direct + * child. Production-evidence regression: 9+ orphan `opencode run` + * processes (PPID=1, etime > 24h) accumulated because the bridge sent + * SIGTERM only to the direct child; opencode's tool/MCP forks survived + * and were reparented to init. Tests pin the contract. + */ +describe('killTree', () => { + it('kills the entire process group, including grandchildren', async () => { + // hostSpawner uses detached:true, so the spawned node becomes a + // pgrp leader. Its child (default attached) inherits that pgid. + // Signaling -pgid reaches both. Print grandchild pid to stdout so + // the test can verify it died after killTree returns. + const parent = await hostSpawner('node', [ + '-e', + [ + 'const { spawn } = require("node:child_process");', + 'const g = spawn("node", ["-e", "setInterval(() => {}, 100)"]);', + 'process.stdout.write(String(g.pid) + "\\n");', + 'setInterval(() => {}, 100);', + ].join(''), + ], { stdio: ['ignore', 'pipe', 'pipe'] }) + try { + const grandchildPid = await new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error('grandchild pid never reported')), 5_000) + let buf = '' + parent.child.stdout?.on('data', (b) => { + buf += b.toString() + const m = buf.match(/(\d+)/) + if (m) { + clearTimeout(timer) + resolve(Number(m[1])) + } + }) + }) + expect(grandchildPid).toBeGreaterThan(0) + expect(processExists(grandchildPid)).toBe(true) + + const started = Date.now() + await killTree(parent.child, { gracefulMs: 250 }) + const elapsed = Date.now() - started + + // SIGKILL after grace window — must return within a few seconds + // even though the grandchild is in setInterval forever. + expect(elapsed).toBeLessThan(5_000) + + // Give the OS one scheduler tick to reap the processes. + await new Promise((resolve) => setTimeout(resolve, 100)) + expect(parent.child.exitCode !== null || parent.child.signalCode !== null).toBe(true) + expect(processExists(grandchildPid)).toBe(false) + } finally { + parent.release() + } + }) + + it('is idempotent — calling twice does not throw', async () => { + const result = await hostSpawner('node', ['-e', 'setInterval(() => {}, 50)'], { + stdio: ['ignore', 'pipe', 'pipe'], + }) + await killTree(result.child) + await expect(killTree(result.child)).resolves.toBeUndefined() + result.release() + }) +}) + +function processExists(pid: number): boolean { + try { + // Signal 0 doesn't deliver but does check the pid exists + we have + // permission. ESRCH = not found. + process.kill(pid, 0) + return true + } catch { + return false + } +} + // ─── writeStdinPayload NDJSON shape selector ───────────────────────────── /** diff --git a/tests/smoke.test.ts b/tests/smoke.test.ts index a625cd9..303668f 100644 --- a/tests/smoke.test.ts +++ b/tests/smoke.test.ts @@ -758,6 +758,109 @@ describe('GET /health', () => { const body = await res.json() as { status: string } expect(body.status).toBe('degraded') }) + + // Regression: the watchdog hits /health every 60s on every bridge. + // Pre-fix each call spawned `--version` on every CLI backend; under + // heavy load those spawns blocked >5s and `curl --max-time 5` then + // SIGKILL'd the bridge. Cache must return memoized state without + // re-probing within the TTL window. + it('caches probe results within the TTL window — repeated calls do not re-probe', async () => { + const app = new Hono() + const registry = new BackendRegistry().register(new FakeBackend('claude')) + let probeCalls = 0 + mountHealth(app, { registry }, { + cacheMs: 60_000, + probe: async (b) => { + probeCalls += 1 + return await b.health() + }, + }) + await app.request('/health') + await app.request('/health') + await app.request('/health') + expect(probeCalls).toBe(1) + }) + + it('?force=1 bypasses the cache so debugging stays possible', async () => { + const app = new Hono() + const registry = new BackendRegistry().register(new FakeBackend('claude')) + let probeCalls = 0 + mountHealth(app, { registry }, { + cacheMs: 60_000, + probe: async (b) => { + probeCalls += 1 + return await b.health() + }, + }) + await app.request('/health') + await app.request('/health?force=1') + expect(probeCalls).toBe(2) + }) + + it('re-probes after the TTL window expires', async () => { + const app = new Hono() + const registry = new BackendRegistry().register(new FakeBackend('claude')) + let probeCalls = 0 + let nowValue = 1_000_000 + mountHealth(app, { registry }, { + cacheMs: 5_000, + now: () => nowValue, + probe: async (b) => { + probeCalls += 1 + return await b.health() + }, + }) + await app.request('/health') + expect(probeCalls).toBe(1) + nowValue += 4_000 // within TTL + await app.request('/health') + expect(probeCalls).toBe(1) + nowValue += 2_000 // total +6s, past 5s TTL + await app.request('/health') + expect(probeCalls).toBe(2) + }) + + it('per-probe timeout short-circuits a wedged backend — /health still returns', async () => { + class HangingBackend extends FakeBackend { + override async health() { + // Promise that never resolves — simulates a wedged fork() that + // the kernel can't service because the box is overloaded. + await new Promise(() => undefined) + return { name: this.name, state: 'ready' as const } + } + } + const app = new Hono() + const registry = new BackendRegistry() + .register(new HangingBackend('opencode')) + .register(new FakeBackend('claude')) // one healthy + mountHealth(app, { registry }, { probeTimeoutMs: 100, cacheMs: 0 }) + const started = Date.now() + const res = await app.request('/health') + const elapsed = Date.now() - started + // The endpoint must NOT block on the wedged probe. + expect(elapsed).toBeLessThan(500) + expect(res.status).toBe(200) // claude was still ready + const body = await res.json() as { backends: Array<{ name: string; state: string; detail?: string }> } + const opencode = body.backends.find((b) => b.name === 'opencode') + expect(opencode?.state).toBe('error') + expect(opencode?.detail).toContain('timed out') + }) + + it('synthesises an error state when a backend rejects (does not crash /health)', async () => { + class ThrowingBackend extends FakeBackend { + override async health(): Promise { + throw new Error('binary not found at /nope') + } + } + const app = new Hono() + const registry = new BackendRegistry().register(new ThrowingBackend('codex')) + mountHealth(app, { registry }, { cacheMs: 0 }) + const res = await app.request('/health') + expect(res.status).toBe(503) // no healthy backends + const body = await res.json() as { backends: Array<{ name: string; state: string; detail?: string }> } + expect(body.backends[0]!.state).toBe('error') + expect(body.backends[0]!.detail).toContain('binary not found') + }) }) describe('GET /v1/models', () => {