From b030ae9f25d5cc3519a02070f509199f231327f7 Mon Sep 17 00:00:00 2001 From: Ryan Sutton Date: Fri, 24 Apr 2026 17:12:25 -0600 Subject: [PATCH 1/3] feat(agent): structured output for codex adapter via stdio MCP Codex has no native equivalent of Claude's outputFormat. When a task has json_schema + an onStructuredOutput callback, inject an in-process stdio MCP server exposing a create_output tool that validates against the schema with AJV, and watch for a completed tool_call_update on the ACP stream to fire onStructuredOutput exactly once. Path resolution walks up from import.meta.dirname so the compiled MCP script is found regardless of which bundle entry imports this module (dist/agent.js, dist/server/bin.cjs, harness bundles). Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/agent/package.json | 1 + packages/agent/src/adapters/acp-connection.ts | 1 + .../agent/src/adapters/codex/codex-agent.ts | 138 ++++++++++++++++-- .../agent/src/adapters/codex/codex-client.ts | 62 ++++++++ .../codex/structured-output-mcp-server.ts | 81 ++++++++++ packages/agent/tsup.config.ts | 1 + 6 files changed, 268 insertions(+), 16 deletions(-) create mode 100644 packages/agent/src/adapters/codex/structured-output-mcp-server.ts diff --git a/packages/agent/package.json b/packages/agent/package.json index 47ca765eb..d05687010 100644 --- a/packages/agent/package.json +++ b/packages/agent/package.json @@ -131,6 +131,7 @@ "hono": "^4.11.7", "jsonwebtoken": "^9.0.2", "minimatch": "^10.0.3", + "@modelcontextprotocol/sdk": "1.29.0", "tar": "^7.5.0", "uuid": "13.0.0", "yoga-wasm-web": "^0.3.3", diff --git a/packages/agent/src/adapters/acp-connection.ts b/packages/agent/src/adapters/acp-connection.ts index 085ca58be..e06e9a507 100644 --- a/packages/agent/src/adapters/acp-connection.ts +++ b/packages/agent/src/adapters/acp-connection.ts @@ -205,6 +205,7 @@ function createCodexConnection(config: AcpConnectionConfig): AcpConnection { codexProcessOptions: config.codexOptions ?? {}, processCallbacks: config.processCallbacks, posthogApiConfig: resolveEnricherApiConfig(config), + onStructuredOutput: config.onStructuredOutput, }); return agent; }, agentStream); diff --git a/packages/agent/src/adapters/codex/codex-agent.ts b/packages/agent/src/adapters/codex/codex-agent.ts index 9ed83ec57..1ac0fa5b3 100644 --- a/packages/agent/src/adapters/codex/codex-agent.ts +++ b/packages/agent/src/adapters/codex/codex-agent.ts @@ -9,6 +9,8 @@ * - System prompt injection */ +import { existsSync } from "node:fs"; +import { resolve as resolvePath } from "node:path"; import { type AgentSideConnection, type AuthenticateRequest, @@ -22,6 +24,7 @@ import { type LoadSessionRequest, type LoadSessionResponse, type McpServer, + type McpServerStdio, type NewSessionRequest, type NewSessionResponse, ndJsonStream, @@ -85,12 +88,14 @@ interface NewSessionMeta { additionalRoots?: string[]; disableBuiltInTools?: boolean; allowedDomains?: string[]; + jsonSchema?: Record | null; } export interface CodexAcpAgentOptions { codexProcessOptions: CodexProcessOptions; processCallbacks?: ProcessSpawnedCallback; posthogApiConfig?: PostHogAPIConfig; + onStructuredOutput?: (output: Record) => Promise; } type CodexSession = BaseSession & { @@ -152,6 +157,56 @@ function getCurrentPermissionMode( return toCodexPermissionMode(fallbackMode); } +/** + * Name of the injected stdio MCP server that exposes `create_output`. + * Kept stable so tool-call updates can be matched to our server. + */ +export const STRUCTURED_OUTPUT_MCP_NAME = "posthog_output"; +export const STRUCTURED_OUTPUT_TOOL_NAME = "create_output"; + +const STRUCTURED_OUTPUT_INSTRUCTIONS = `\n\nWhen you have completed the task, call the \`${STRUCTURED_OUTPUT_TOOL_NAME}\` tool with the final structured result. The tool's input schema matches the required output format for this task. Do not describe the result in a plain message — submitting it via the tool is required for the task to be considered complete.`; + +/** + * Builds the stdio MCP server config that exposes the `create_output` tool. + * The child process validates tool input against the JSON schema with AJV. + * We pass the schema as a base64-encoded env var to avoid shell escaping. + * + * Path resolves relative to the compiled adapter location. When bundled into + * different entry points (dist/agent.js, dist/server/bin.cjs, dist/server/ + * harness/bin.js, etc), `import.meta.dirname` sits at different depths. Walk + * up until we find the script so each bundle locates the shared dist asset. + */ +function resolveStructuredOutputMcpScript(): string { + const override = process.env.POSTHOG_STRUCTURED_OUTPUT_MCP_SCRIPT; + if (override && existsSync(override)) return override; + + const rel = "adapters/codex/structured-output-mcp-server.js"; + let dir = import.meta.dirname ?? __dirname; + for (let i = 0; i < 5; i++) { + const candidate = resolvePath(dir, rel); + if (existsSync(candidate)) return candidate; + dir = resolvePath(dir, ".."); + } + throw new Error( + `Could not locate ${rel} relative to ${import.meta.dirname ?? __dirname}. Set POSTHOG_STRUCTURED_OUTPUT_MCP_SCRIPT to override.`, + ); +} + +function buildStructuredOutputMcpServer( + jsonSchema: Record, +): McpServerStdio { + const scriptPath = resolveStructuredOutputMcpScript(); + const schemaBase64 = Buffer.from(JSON.stringify(jsonSchema)).toString( + "base64", + ); + return { + name: STRUCTURED_OUTPUT_MCP_NAME, + command: process.execPath, + args: [scriptPath], + env: [{ name: "POSTHOG_OUTPUT_SCHEMA", value: schemaBase64 }], + }; +} + export class CodexAcpAgent extends BaseAcpAgent { readonly adapterName = "codex"; declare session: CodexSession; @@ -171,6 +226,9 @@ export class CodexAcpAgent extends BaseAcpAgent { private promptMutex: Promise = Promise.resolve(); private readonly codexProcessOptions: CodexProcessOptions; private readonly processCallbacks?: ProcessSpawnedCallback; + private readonly onStructuredOutput?: ( + output: Record, + ) => Promise; // Snapshot of the initialize() request so refreshSession can replay the // same handshake against a respawned codex-acp subprocess. private lastInitRequest?: InitializeRequest; @@ -187,6 +245,7 @@ export class CodexAcpAgent extends BaseAcpAgent { this.codexProcessOptions = options.codexProcessOptions; this.processCallbacks = options.processCallbacks; + this.onStructuredOutput = options.onStructuredOutput; // Spawn the codex-acp subprocess this.codexProcess = spawnCodexProcess({ @@ -221,6 +280,7 @@ export class CodexAcpAgent extends BaseAcpAgent { (_agent) => createCodexClient(this.client, this.logger, this.sessionState, { enrichmentDeps: this.enrichment?.deps, + onStructuredOutput: this.onStructuredOutput, }), codexStream, ); @@ -264,7 +324,8 @@ export class CodexAcpAgent extends BaseAcpAgent { const meta = params._meta as NewSessionMeta | undefined; const requestedPermissionMode = toCodexPermissionMode(meta?.permissionMode); - const response = await this.codexConnection.newSession(params); + const injectedParams = this.applyStructuredOutput(params, meta); + const response = await this.codexConnection.newSession(injectedParams); // Initialize session state this.sessionState = createSessionState(response.sessionId, params.cwd, { @@ -301,8 +362,9 @@ export class CodexAcpAgent extends BaseAcpAgent { } async loadSession(params: LoadSessionRequest): Promise { - const response = await this.codexConnection.loadSession(params); const meta = params._meta as NewSessionMeta | undefined; + const injectedParams = this.applyStructuredOutput(params, meta); + const response = await this.codexConnection.loadSession(injectedParams); const currentPermissionMode = getCurrentPermissionMode( response.modes?.currentModeId, meta?.permissionMode, @@ -335,14 +397,19 @@ export class CodexAcpAgent extends BaseAcpAgent { async unstable_resumeSession( params: ResumeSessionRequest, ): Promise { - // codex-acp doesn't support resume natively, use loadSession instead - const loadResponse = await this.codexConnection.loadSession({ - sessionId: params.sessionId, - cwd: params.cwd, - mcpServers: params.mcpServers ?? [], - }); - const meta = params._meta as NewSessionMeta | undefined; + const injectedParams = this.applyStructuredOutput( + { + sessionId: params.sessionId, + cwd: params.cwd, + mcpServers: params.mcpServers ?? [], + _meta: params._meta, + }, + meta, + ); + + // codex-acp doesn't support resume natively, use loadSession instead + const loadResponse = await this.codexConnection.loadSession(injectedParams); const currentPermissionMode = getCurrentPermissionMode( loadResponse.modes?.currentModeId, meta?.permissionMode, @@ -374,14 +441,19 @@ export class CodexAcpAgent extends BaseAcpAgent { async unstable_forkSession( params: ForkSessionRequest, ): Promise { + const meta = params._meta as NewSessionMeta | undefined; + const injectedParams = this.applyStructuredOutput( + { + cwd: params.cwd, + mcpServers: params.mcpServers ?? [], + _meta: params._meta, + }, + meta, + ); + // Create a new session via codex-acp (fork isn't natively supported) - const newResponse = await this.codexConnection.newSession({ - cwd: params.cwd, - mcpServers: params.mcpServers ?? [], - _meta: params._meta, - }); + const newResponse = await this.codexConnection.newSession(injectedParams); - const meta = params._meta as NewSessionMeta | undefined; const requestedPermissionMode = toCodexPermissionMode(meta?.permissionMode); this.sessionState = createSessionState(newResponse.sessionId, params.cwd, { taskRunId: meta?.taskRunId, @@ -401,6 +473,38 @@ export class CodexAcpAgent extends BaseAcpAgent { return newResponse; } + /** + * When the caller wires up `onStructuredOutput` and provides a JSON schema + * via `_meta.jsonSchema`, inject the stdio MCP server that exposes + * `create_output` and append instructions telling the model to use it. + * + * Codex has no native equivalent of Claude's `outputFormat`, so we lean on + * MCP tool-calling to get validated structured output back. + */ + private applyStructuredOutput< + T extends { mcpServers?: McpServer[]; _meta?: unknown }, + >(request: T, meta: NewSessionMeta | undefined): T { + if (!meta?.jsonSchema || !this.onStructuredOutput) { + return request; + } + + const mcpServer = buildStructuredOutputMcpServer(meta.jsonSchema); + const existingMeta = (request._meta ?? {}) as Record; + const existingSystemPrompt = + typeof existingMeta.systemPrompt === "string" + ? existingMeta.systemPrompt + : ""; + + return { + ...request, + mcpServers: [...(request.mcpServers ?? []), mcpServer], + _meta: { + ...existingMeta, + systemPrompt: existingSystemPrompt + STRUCTURED_OUTPUT_INSTRUCTIONS, + }, + }; + } + private async applyInitialPermissionMode( sessionId: string, permissionMode?: string, @@ -617,7 +721,9 @@ export class CodexAcpAgent extends BaseAcpAgent { const newAbortController = new AbortController(); const newConnection = new ClientSideConnection( (_agent) => - createCodexClient(this.client, this.logger, this.sessionState), + createCodexClient(this.client, this.logger, this.sessionState, { + onStructuredOutput: this.onStructuredOutput, + }), codexStream, ); diff --git a/packages/agent/src/adapters/codex/codex-client.ts b/packages/agent/src/adapters/codex/codex-client.ts index 727d8876b..c02caf345 100644 --- a/packages/agent/src/adapters/codex/codex-client.ts +++ b/packages/agent/src/adapters/codex/codex-client.ts @@ -42,6 +42,32 @@ export interface CodexClientCallbacks { onUsageUpdate?: (update: Record) => void; /** When set, Read responses are annotated with PostHog enrichment before reaching codex-acp. */ enrichmentDeps?: FileEnrichmentDeps; + /** + * Called once per session when the agent completes the injected + * `create_output` MCP tool. Matches the Claude adapter's structured + * output delivery. + */ + onStructuredOutput?: (output: Record) => Promise; +} + +const STRUCTURED_OUTPUT_TOOL_NAME = "create_output"; + +/** + * Tool calls for our injected MCP server surface in ACP `tool_call` / + * `tool_call_update` notifications. The `title` from codex-acp can be + * either the bare tool name or prefixed (`mcp____`); match + * permissively so behavior is stable across codex-acp versions. + */ +function isStructuredOutputToolCall(title: string | undefined | null): boolean { + if (!title) return false; + return title.includes(STRUCTURED_OUTPUT_TOOL_NAME); +} + +function toRecord(value: unknown): Record | null { + if (value && typeof value === "object" && !Array.isArray(value)) { + return value as Record; + } + return null; } const AUTO_APPROVED_KINDS: Record> = { @@ -96,6 +122,14 @@ export function createCodexClient( callbacks?: CodexClientCallbacks, ): Client { const terminalHandles = new Map(); + // Track rawInput across tool_call → tool_call_update → completed so we can + // fire onStructuredOutput exactly once per tool call id. Entries are + // removed after firing so a retry (if codex-acp ever re-emits) won't + // double-fire. + const structuredOutputState = new Map< + string, + { rawInput?: Record; fired: boolean } + >(); return { async requestPermission( @@ -125,6 +159,34 @@ export function createCodexClient( async sessionUpdate(params: SessionNotification): Promise { const update = params.update as Record | undefined; + + if ( + callbacks?.onStructuredOutput && + (update?.sessionUpdate === "tool_call" || + update?.sessionUpdate === "tool_call_update") + ) { + const toolCallId = update.toolCallId as string | undefined; + const title = update.title as string | undefined; + if (toolCallId && isStructuredOutputToolCall(title)) { + const entry = structuredOutputState.get(toolCallId) ?? { + fired: false, + }; + const rawInput = toRecord(update.rawInput); + if (rawInput) entry.rawInput = rawInput; + structuredOutputState.set(toolCallId, entry); + + if (update.status === "completed" && !entry.fired && entry.rawInput) { + entry.fired = true; + structuredOutputState.delete(toolCallId); + try { + await callbacks.onStructuredOutput(entry.rawInput); + } catch (err) { + logger.warn("onStructuredOutput callback threw", { error: err }); + } + } + } + } + if (update?.sessionUpdate === "usage_update") { const used = update.used as number | undefined; const size = update.size as number | undefined; diff --git a/packages/agent/src/adapters/codex/structured-output-mcp-server.ts b/packages/agent/src/adapters/codex/structured-output-mcp-server.ts new file mode 100644 index 000000000..188f0da63 --- /dev/null +++ b/packages/agent/src/adapters/codex/structured-output-mcp-server.ts @@ -0,0 +1,81 @@ +/** + * Standalone stdio MCP server for structured output in the Codex adapter. + * + * Spawned by codex-acp as an MCP server process. Reads the JSON schema + * from the POSTHOG_OUTPUT_SCHEMA env var (base64-encoded), registers a + * `create_output` tool, and validates input with AJV. + * + * Usage: + * POSTHOG_OUTPUT_SCHEMA= node structured-output-mcp-server.js + */ + +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; +import Ajv from "ajv"; +import { z } from "zod"; + +const OUTPUT_TOOL_NAME = "create_output"; + +const schemaEnv = process.env.POSTHOG_OUTPUT_SCHEMA; +if (!schemaEnv) { + process.exit(1); +} + +let jsonSchema: Record; +try { + jsonSchema = JSON.parse(Buffer.from(schemaEnv, "base64").toString("utf-8")); +} catch (_err) { + process.exit(1); +} + +const ajv = new Ajv({ allErrors: true }); +const validate = ajv.compile(jsonSchema); + +const zodType = z.fromJSONSchema(jsonSchema); +if (!(zodType instanceof z.ZodObject)) { + process.exit(1); +} +// McpServer.tool() expects a mutable ZodRawShape +const zodShape = { ...zodType.shape } as z.ZodRawShape; + +const server = new McpServer({ + name: "posthog_output", + version: "1.0.0", +}); + +server.tool( + OUTPUT_TOOL_NAME, + "Submit the structured output for this task. Call this tool with the required fields to deliver your final result.", + zodShape, + async (args) => { + const valid = validate(args); + if (!valid) { + const errors = validate.errors + ?.map((e) => `${e.instancePath || "/"}: ${e.message}`) + .join("; "); + return { + content: [ + { + type: "text" as const, + text: `Validation failed: ${errors}. Please fix the output and try again.`, + }, + ], + isError: true, + }; + } + + // Output is valid — return success. The parent process captures + // the validated output by intercepting the tool call in the ACP stream. + return { + content: [ + { + type: "text" as const, + text: "Output submitted successfully.", + }, + ], + }; + }, +); + +const transport = new StdioServerTransport(); +await server.connect(transport); diff --git a/packages/agent/tsup.config.ts b/packages/agent/tsup.config.ts index a21a206b5..130344cea 100644 --- a/packages/agent/tsup.config.ts +++ b/packages/agent/tsup.config.ts @@ -86,6 +86,7 @@ export default defineConfig([ "src/adapters/claude/session/models.ts", "src/adapters/codex/models.ts", "src/adapters/claude/mcp/tool-metadata.ts", + "src/adapters/codex/structured-output-mcp-server.ts", "src/adapters/reasoning-effort.ts", "src/execution-mode.ts", "src/server/schemas.ts", From 83622d00cf600dc4d66f724d826185115cfe133d Mon Sep 17 00:00:00 2001 From: Ryan Sutton Date: Sun, 26 Apr 2026 15:30:00 -0600 Subject: [PATCH 2/3] Fix testing --- .../src/adapters/codex/codex-agent.test.ts | 134 ++++++++++++- .../src/adapters/codex/codex-client.test.ts | 178 ++++++++++++++++++ .../agent/src/adapters/codex/codex-client.ts | 7 +- pnpm-lock.yaml | 39 ++-- 4 files changed, 334 insertions(+), 24 deletions(-) diff --git a/packages/agent/src/adapters/codex/codex-agent.test.ts b/packages/agent/src/adapters/codex/codex-agent.test.ts index 559821ffa..092baf030 100644 --- a/packages/agent/src/adapters/codex/codex-agent.test.ts +++ b/packages/agent/src/adapters/codex/codex-agent.test.ts @@ -4,7 +4,7 @@ import type { LoadSessionResponse, NewSessionResponse, } from "@agentclientprotocol/sdk"; -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; const mockCodexConnection = { initialize: vi.fn(), @@ -60,7 +60,12 @@ describe("CodexAcpAgent", () => { vi.clearAllMocks(); }); - function createAgent(overrides: Partial = {}): { + function createAgent( + overrides: Partial = {}, + agentOptions?: { + onStructuredOutput?: (output: Record) => Promise; + }, + ): { agent: CodexAcpAgent; client: AgentSideConnection & { extNotification: ReturnType; @@ -80,6 +85,7 @@ describe("CodexAcpAgent", () => { codexProcessOptions: { cwd: process.cwd(), }, + onStructuredOutput: agentOptions?.onStructuredOutput, }); return { agent, client }; } @@ -295,6 +301,130 @@ describe("CodexAcpAgent", () => { ).resolves.toEqual({ stopReason: "end_turn" }); }); + describe("structured output injection", () => { + const schema = { + type: "object", + properties: { answer: { type: "string" } }, + required: ["answer"], + } as const; + + beforeEach(() => { + // The resolver insists the script path exists. Point at the node + // binary itself — always present, and the agent only forwards the + // path to codex-acp; nothing in this test actually spawns it. + vi.stubEnv("POSTHOG_STRUCTURED_OUTPUT_MCP_SCRIPT", process.execPath); + }); + + afterEach(() => { + vi.unstubAllEnvs(); + }); + + it("injects the create_output MCP server and system-prompt note when jsonSchema and callback are present", async () => { + const { agent } = createAgent({}, { onStructuredOutput: vi.fn() }); + mockCodexConnection.newSession.mockResolvedValue({ + sessionId: "session-1", + modes: { currentModeId: "auto", availableModes: [] }, + configOptions: [], + } satisfies Partial); + + await agent.newSession({ + cwd: process.cwd(), + mcpServers: [{ name: "existing", command: "echo", args: [], env: [] }], + _meta: { jsonSchema: schema, systemPrompt: "be terse." }, + } as never); + + const forwarded = mockCodexConnection.newSession.mock.calls[0][0] as { + mcpServers: Array<{ name: string; command: string; env: unknown }>; + _meta: { systemPrompt: string }; + }; + + // Existing MCP server is preserved; ours is appended. + expect(forwarded.mcpServers).toHaveLength(2); + expect(forwarded.mcpServers[0].name).toBe("existing"); + expect(forwarded.mcpServers[1].name).toBe("posthog_output"); + expect(forwarded.mcpServers[1].command).toBe(process.execPath); + + // The schema is forwarded base64-encoded so codex-acp doesn't have + // to escape it through a shell. + const envEntry = ( + forwarded.mcpServers[1].env as Array<{ name: string; value: string }> + ).find((e) => e.name === "POSTHOG_OUTPUT_SCHEMA"); + expect(envEntry).toBeDefined(); + const decoded = JSON.parse( + Buffer.from(envEntry?.value ?? "", "base64").toString("utf-8"), + ); + expect(decoded).toEqual(schema); + + // Existing systemPrompt is preserved with the structured-output + // instruction appended (not overwritten). + expect(forwarded._meta.systemPrompt.startsWith("be terse.")).toBe(true); + expect(forwarded._meta.systemPrompt).toContain("create_output"); + }); + + it("is a no-op when jsonSchema is absent", async () => { + const { agent } = createAgent({}, { onStructuredOutput: vi.fn() }); + mockCodexConnection.newSession.mockResolvedValue({ + sessionId: "session-1", + modes: { currentModeId: "auto", availableModes: [] }, + configOptions: [], + } satisfies Partial); + + await agent.newSession({ + cwd: process.cwd(), + mcpServers: [], + } as never); + + const forwarded = mockCodexConnection.newSession.mock.calls[0][0] as { + mcpServers: unknown[]; + _meta?: { systemPrompt?: string }; + }; + expect(forwarded.mcpServers).toEqual([]); + expect(forwarded._meta?.systemPrompt).toBeUndefined(); + }); + + it("is a no-op when onStructuredOutput callback is not wired", async () => { + const { agent } = createAgent(); + mockCodexConnection.newSession.mockResolvedValue({ + sessionId: "session-1", + modes: { currentModeId: "auto", availableModes: [] }, + configOptions: [], + } satisfies Partial); + + await agent.newSession({ + cwd: process.cwd(), + mcpServers: [], + _meta: { jsonSchema: schema }, + } as never); + + const forwarded = mockCodexConnection.newSession.mock.calls[0][0] as { + mcpServers: unknown[]; + }; + expect(forwarded.mcpServers).toEqual([]); + }); + + it("also injects on loadSession", async () => { + const { agent } = createAgent({}, { onStructuredOutput: vi.fn() }); + mockCodexConnection.loadSession.mockResolvedValue({ + modes: { currentModeId: "auto", availableModes: [] }, + configOptions: [], + } satisfies Partial); + + await agent.loadSession({ + sessionId: "session-1", + cwd: process.cwd(), + mcpServers: [], + _meta: { jsonSchema: schema }, + } as never); + + const forwarded = mockCodexConnection.loadSession.mock.calls[0][0] as { + mcpServers: Array<{ name: string }>; + }; + expect(forwarded.mcpServers.map((s) => s.name)).toContain( + "posthog_output", + ); + }); + }); + it("broadcasts user prompt as user_message_chunk before delegating to codex-acp", async () => { const { agent, client } = createAgent(); // Seed an active session so prompt() has the state it expects. diff --git a/packages/agent/src/adapters/codex/codex-client.test.ts b/packages/agent/src/adapters/codex/codex-client.test.ts index c012a2b4b..68ea50ad7 100644 --- a/packages/agent/src/adapters/codex/codex-client.test.ts +++ b/packages/agent/src/adapters/codex/codex-client.test.ts @@ -2,6 +2,7 @@ import type { AgentSideConnection, ReadTextFileRequest, ReadTextFileResponse, + SessionNotification, } from "@agentclientprotocol/sdk"; import { describe, expect, test, vi } from "vitest"; import type { FileEnrichmentDeps } from "../../enrichment/file-enricher"; @@ -110,3 +111,180 @@ describe("createCodexClient readTextFile", () => { expect(upstream.readTextFile).toHaveBeenCalledWith(params); }); }); + +describe("createCodexClient onStructuredOutput", () => { + const logger = new Logger({ debug: false, prefix: "[test]" }); + const sessionState = createSessionState("sess", "/tmp"); + + function makeUpstream(): AgentSideConnection { + return { + sessionUpdate: vi.fn(async () => {}), + requestPermission: vi.fn(), + readTextFile: vi.fn(), + writeTextFile: vi.fn(), + createTerminal: vi.fn(), + terminalOutput: vi.fn(), + releaseTerminal: vi.fn(), + waitForTerminalExit: vi.fn(), + killTerminal: vi.fn(), + extMethod: vi.fn(), + extNotification: vi.fn(), + } as unknown as AgentSideConnection; + } + + function notification(update: Record): SessionNotification { + return { + sessionId: "sess", + update, + } as unknown as SessionNotification; + } + + test("fires once when create_output completes after rawInput arrived", async () => { + const onStructuredOutput = vi.fn(async () => {}); + const upstream = makeUpstream(); + const client = createCodexClient(upstream, logger, sessionState, { + onStructuredOutput, + }); + + await client.sessionUpdate?.( + notification({ + sessionUpdate: "tool_call", + toolCallId: "tc-1", + title: "create_output", + status: "in_progress", + rawInput: { result: "ok", count: 5 }, + }), + ); + expect(onStructuredOutput).not.toHaveBeenCalled(); + + await client.sessionUpdate?.( + notification({ + sessionUpdate: "tool_call_update", + toolCallId: "tc-1", + title: "create_output", + status: "completed", + }), + ); + + expect(onStructuredOutput).toHaveBeenCalledTimes(1); + expect(onStructuredOutput).toHaveBeenCalledWith({ result: "ok", count: 5 }); + }); + + test("matches mcp__-prefixed tool titles", async () => { + const onStructuredOutput = vi.fn(async () => {}); + const upstream = makeUpstream(); + const client = createCodexClient(upstream, logger, sessionState, { + onStructuredOutput, + }); + + await client.sessionUpdate?.( + notification({ + sessionUpdate: "tool_call", + toolCallId: "tc-1", + title: "mcp__posthog_output__create_output", + status: "completed", + rawInput: { ok: true }, + }), + ); + + expect(onStructuredOutput).toHaveBeenCalledWith({ ok: true }); + }); + + test("ignores tool calls that aren't create_output", async () => { + const onStructuredOutput = vi.fn(async () => {}); + const upstream = makeUpstream(); + const client = createCodexClient(upstream, logger, sessionState, { + onStructuredOutput, + }); + + await client.sessionUpdate?.( + notification({ + sessionUpdate: "tool_call", + toolCallId: "tc-1", + title: "Read", + status: "completed", + rawInput: { path: "/tmp/x" }, + }), + ); + + expect(onStructuredOutput).not.toHaveBeenCalled(); + }); + + test("does not fire when rawInput never arrived", async () => { + const onStructuredOutput = vi.fn(async () => {}); + const upstream = makeUpstream(); + const client = createCodexClient(upstream, logger, sessionState, { + onStructuredOutput, + }); + + await client.sessionUpdate?.( + notification({ + sessionUpdate: "tool_call", + toolCallId: "tc-1", + title: "create_output", + status: "completed", + }), + ); + + expect(onStructuredOutput).not.toHaveBeenCalled(); + }); + + test("does not fire twice if completed is re-emitted for the same tool call", async () => { + const onStructuredOutput = vi.fn(async () => {}); + const upstream = makeUpstream(); + const client = createCodexClient(upstream, logger, sessionState, { + onStructuredOutput, + }); + + const completed = notification({ + sessionUpdate: "tool_call", + toolCallId: "tc-1", + title: "create_output", + status: "completed", + rawInput: { final: 1 }, + }); + + await client.sessionUpdate?.(completed); + await client.sessionUpdate?.(completed); + + expect(onStructuredOutput).toHaveBeenCalledTimes(1); + }); + + test("forwards the notification upstream regardless of structured-output handling", async () => { + const onStructuredOutput = vi.fn(async () => {}); + const upstream = makeUpstream(); + const client = createCodexClient(upstream, logger, sessionState, { + onStructuredOutput, + }); + + const note = notification({ + sessionUpdate: "tool_call", + toolCallId: "tc-1", + title: "create_output", + status: "completed", + rawInput: { final: 1 }, + }); + await client.sessionUpdate?.(note); + + expect(upstream.sessionUpdate).toHaveBeenCalledWith(note); + }); + + test("does nothing when the callback is not wired", async () => { + const upstream = makeUpstream(); + const client = createCodexClient(upstream, logger, sessionState); + + // No onStructuredOutput configured — must not throw and must still + // forward upstream. + await client.sessionUpdate?.( + notification({ + sessionUpdate: "tool_call", + toolCallId: "tc-1", + title: "create_output", + status: "completed", + rawInput: { x: 1 }, + }), + ); + + expect(upstream.sessionUpdate).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/agent/src/adapters/codex/codex-client.ts b/packages/agent/src/adapters/codex/codex-client.ts index c02caf345..4631f035f 100644 --- a/packages/agent/src/adapters/codex/codex-client.ts +++ b/packages/agent/src/adapters/codex/codex-client.ts @@ -123,9 +123,9 @@ export function createCodexClient( ): Client { const terminalHandles = new Map(); // Track rawInput across tool_call → tool_call_update → completed so we can - // fire onStructuredOutput exactly once per tool call id. Entries are - // removed after firing so a retry (if codex-acp ever re-emits) won't - // double-fire. + // fire onStructuredOutput exactly once per tool call id. Entries stay in + // the map after firing with `fired: true` so a re-emitted completion + // (if codex-acp ever resends one) is a no-op. const structuredOutputState = new Map< string, { rawInput?: Record; fired: boolean } @@ -177,7 +177,6 @@ export function createCodexClient( if (update.status === "completed" && !entry.fired && entry.rawInput) { entry.fired = true; - structuredOutputState.delete(toolCallId); try { await callbacks.onStructuredOutput(entry.rawInput); } catch (err) { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index cf9657e97..3d21f6f35 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -261,7 +261,7 @@ importers: version: 17.2.3 drizzle-orm: specifier: ^0.45.1 - version: 0.45.1(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(better-sqlite3@12.8.0)(bun-types@1.3.12) + version: 0.45.1(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(better-sqlite3@12.8.0)(bun-types@1.3.13) electron-log: specifier: ^5.4.3 version: 5.4.3 @@ -663,6 +663,9 @@ importers: '@hono/node-server': specifier: ^1.19.9 version: 1.19.9(hono@4.11.7) + '@modelcontextprotocol/sdk': + specifier: 1.29.0 + version: 1.29.0(zod@4.3.6) '@opentelemetry/api-logs': specifier: ^0.208.0 version: 0.208.0 @@ -720,7 +723,7 @@ importers: version: link:../shared '@types/bun': specifier: latest - version: 1.3.12 + version: 1.3.13 '@types/tar': specifier: ^6.1.13 version: 6.1.13 @@ -5184,8 +5187,8 @@ packages: '@types/better-sqlite3@7.6.13': resolution: {integrity: sha512-NMv9ASNARoKksWtsq/SHakpYAYnhBrQgGD8zkLYk/jaK8jUGn08CfEdTRgYhMypUQAfzSP8W6gNLe0q19/t4VA==} - '@types/bun@1.3.12': - resolution: {integrity: sha512-DBv81elK+/VSwXHDlnH3Qduw+KxkTIWi7TXkAeh24zpi5l0B2kUg9Ga3tb4nJaPcOFswflgi/yAvMVBPrxMB+A==} + '@types/bun@1.3.13': + resolution: {integrity: sha512-9fqXWk5YIHGGnUau9TEi+qdlTYDAnOj+xLCmSTwXfAIqXr2x4tytJb43E9uCvt09zJURKXwAtkoH4nLQfzeTXw==} '@types/cacheable-request@6.0.3': resolution: {integrity: sha512-IQ3EbTzGxIigb1I3qPZc1rWJnH0BmSKv5QYTalEwweFvyBDLSAe24zP0le/hyi7ecGfZVlIVAg4BZqb8WBwKqw==} @@ -5916,8 +5919,8 @@ packages: buffer@6.0.3: resolution: {integrity: sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==} - bun-types@1.3.12: - resolution: {integrity: sha512-HqOLj5PoFajAQciOMRiIZGNoKxDJSr6qigAttOX40vJuSp6DN/CxWp9s3C1Xwm4oH7ybueITwiaOcWXoYVoRkA==} + bun-types@1.3.13: + resolution: {integrity: sha512-QXKeHLlOLqQX9LgYaHJfzdBaV21T63HhFJnvuRCcjZiaUDpbs5ED1MgxbMra71CsryN/1dAoXuJJJwIv/2drVA==} bundle-name@4.1.0: resolution: {integrity: sha512-tjwM5exMg6BGRI+kNmTntNsvdZS1X8BFYS6tnJ2hdH0kVxM6/eVZ2xy+FqStSWvYmtfFMDLIxurorHwDKfDz5Q==} @@ -14779,7 +14782,7 @@ snapshots: react: 19.1.0 react-dom: 19.1.0(react@19.1.0) - '@modelcontextprotocol/sdk@1.27.1(zod@3.25.76)': + '@modelcontextprotocol/sdk@1.27.1(zod@4.3.6)': dependencies: '@hono/node-server': 1.19.9(hono@4.11.7) ajv: 8.17.1 @@ -14796,12 +14799,12 @@ snapshots: json-schema-typed: 8.0.2 pkce-challenge: 5.0.1 raw-body: 3.0.2 - zod: 3.25.76 - zod-to-json-schema: 3.25.1(zod@3.25.76) + zod: 4.3.6 + zod-to-json-schema: 3.25.1(zod@4.3.6) transitivePeerDependencies: - supports-color - '@modelcontextprotocol/sdk@1.27.1(zod@4.3.6)': + '@modelcontextprotocol/sdk@1.29.0(zod@3.25.76)': dependencies: '@hono/node-server': 1.19.9(hono@4.11.7) ajv: 8.17.1 @@ -14818,8 +14821,8 @@ snapshots: json-schema-typed: 8.0.2 pkce-challenge: 5.0.1 raw-body: 3.0.2 - zod: 4.3.6 - zod-to-json-schema: 3.25.1(zod@4.3.6) + zod: 3.25.76 + zod-to-json-schema: 3.25.1(zod@3.25.76) transitivePeerDependencies: - supports-color @@ -16906,9 +16909,9 @@ snapshots: dependencies: '@types/node': 24.12.0 - '@types/bun@1.3.12': + '@types/bun@1.3.13': dependencies: - bun-types: 1.3.12 + bun-types: 1.3.13 '@types/cacheable-request@6.0.3': dependencies: @@ -17788,7 +17791,7 @@ snapshots: base64-js: 1.5.1 ieee754: 1.2.1 - bun-types@1.3.12: + bun-types@1.3.13: dependencies: '@types/node': 24.12.0 @@ -18387,12 +18390,12 @@ snapshots: transitivePeerDependencies: - supports-color - drizzle-orm@0.45.1(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(better-sqlite3@12.8.0)(bun-types@1.3.12): + drizzle-orm@0.45.1(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(better-sqlite3@12.8.0)(bun-types@1.3.13): optionalDependencies: '@opentelemetry/api': 1.9.0 '@types/better-sqlite3': 7.6.13 better-sqlite3: 12.8.0 - bun-types: 1.3.12 + bun-types: 1.3.13 ds-store@0.1.6: dependencies: @@ -23012,7 +23015,7 @@ snapshots: '@babel/plugin-transform-typescript': 7.28.6(@babel/core@7.29.0) '@babel/preset-typescript': 7.28.5(@babel/core@7.29.0) '@dotenvx/dotenvx': 1.60.1 - '@modelcontextprotocol/sdk': 1.27.1(zod@3.25.76) + '@modelcontextprotocol/sdk': 1.29.0(zod@3.25.76) '@types/validate-npm-package-name': 4.0.2 browserslist: 4.28.1 commander: 14.0.3 From 8a3b9019b451b33eb5add565f075b4d76e87734e Mon Sep 17 00:00:00 2001 From: Ryan Sutton Date: Sun, 26 Apr 2026 22:46:47 -0600 Subject: [PATCH 3/3] fix(agent): tighten structured-output MCP wiring per review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Require both server and tool name when matching prefixed tool titles so unrelated user tools containing "create_output" can't trigger onStructuredOutput. - Hoist STRUCTURED_OUTPUT_MCP_NAME / STRUCTURED_OUTPUT_TOOL_NAME into a shared constants module so codex-agent, codex-client, and the spawned MCP server stay in sync. - Drop redundant AJV pass in the MCP server — McpServer.tool() already validates against the Zod shape derived from the schema. - Print a diagnostic to stderr before each early process.exit(1) so spawn failures aren't silent. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/agent/package.json | 1 - .../agent/src/adapters/codex/codex-agent.ts | 16 +++--- .../agent/src/adapters/codex/codex-client.ts | 15 +++-- .../codex/structured-output-constants.ts | 9 +++ .../codex/structured-output-mcp-server.ts | 55 ++++++++----------- pnpm-lock.yaml | 3 - 6 files changed, 52 insertions(+), 47 deletions(-) create mode 100644 packages/agent/src/adapters/codex/structured-output-constants.ts diff --git a/packages/agent/package.json b/packages/agent/package.json index d05687010..4ffd80ec3 100644 --- a/packages/agent/package.json +++ b/packages/agent/package.json @@ -117,7 +117,6 @@ }, "dependencies": { "@agentclientprotocol/sdk": "0.19.0", - "ajv": "^8.17.1", "@anthropic-ai/claude-agent-sdk": "0.2.112", "@anthropic-ai/sdk": "0.89.0", "@hono/node-server": "^1.19.9", diff --git a/packages/agent/src/adapters/codex/codex-agent.ts b/packages/agent/src/adapters/codex/codex-agent.ts index 1ac0fa5b3..0be25e1e7 100644 --- a/packages/agent/src/adapters/codex/codex-agent.ts +++ b/packages/agent/src/adapters/codex/codex-agent.ts @@ -74,6 +74,15 @@ import { type CodexProcessOptions, spawnCodexProcess, } from "./spawn"; +import { + STRUCTURED_OUTPUT_MCP_NAME, + STRUCTURED_OUTPUT_TOOL_NAME, +} from "./structured-output-constants"; + +export { + STRUCTURED_OUTPUT_MCP_NAME, + STRUCTURED_OUTPUT_TOOL_NAME, +} from "./structured-output-constants"; interface NewSessionMeta { taskRunId?: string; @@ -157,13 +166,6 @@ function getCurrentPermissionMode( return toCodexPermissionMode(fallbackMode); } -/** - * Name of the injected stdio MCP server that exposes `create_output`. - * Kept stable so tool-call updates can be matched to our server. - */ -export const STRUCTURED_OUTPUT_MCP_NAME = "posthog_output"; -export const STRUCTURED_OUTPUT_TOOL_NAME = "create_output"; - const STRUCTURED_OUTPUT_INSTRUCTIONS = `\n\nWhen you have completed the task, call the \`${STRUCTURED_OUTPUT_TOOL_NAME}\` tool with the final structured result. The tool's input schema matches the required output format for this task. Do not describe the result in a plain message — submitting it via the tool is required for the task to be considered complete.`; /** diff --git a/packages/agent/src/adapters/codex/codex-client.ts b/packages/agent/src/adapters/codex/codex-client.ts index 4631f035f..ebe05a4fe 100644 --- a/packages/agent/src/adapters/codex/codex-client.ts +++ b/packages/agent/src/adapters/codex/codex-client.ts @@ -36,6 +36,10 @@ import { import type { PermissionMode } from "../../execution-mode"; import type { Logger } from "../../utils/logger"; import type { CodexSessionState } from "./session-state"; +import { + STRUCTURED_OUTPUT_MCP_NAME, + STRUCTURED_OUTPUT_TOOL_NAME, +} from "./structured-output-constants"; export interface CodexClientCallbacks { /** Called when a usage_update session notification is received */ @@ -50,17 +54,20 @@ export interface CodexClientCallbacks { onStructuredOutput?: (output: Record) => Promise; } -const STRUCTURED_OUTPUT_TOOL_NAME = "create_output"; - /** * Tool calls for our injected MCP server surface in ACP `tool_call` / * `tool_call_update` notifications. The `title` from codex-acp can be * either the bare tool name or prefixed (`mcp____`); match - * permissively so behavior is stable across codex-acp versions. + * both forms but require the server name on prefixed titles so an unrelated + * user tool happening to contain `create_output` doesn't trigger us. */ function isStructuredOutputToolCall(title: string | undefined | null): boolean { if (!title) return false; - return title.includes(STRUCTURED_OUTPUT_TOOL_NAME); + if (title === STRUCTURED_OUTPUT_TOOL_NAME) return true; + return ( + title.includes(STRUCTURED_OUTPUT_MCP_NAME) && + title.includes(STRUCTURED_OUTPUT_TOOL_NAME) + ); } function toRecord(value: unknown): Record | null { diff --git a/packages/agent/src/adapters/codex/structured-output-constants.ts b/packages/agent/src/adapters/codex/structured-output-constants.ts new file mode 100644 index 000000000..739e2a775 --- /dev/null +++ b/packages/agent/src/adapters/codex/structured-output-constants.ts @@ -0,0 +1,9 @@ +/** + * Shared identifiers for the injected structured-output MCP server. + * Imported by codex-agent.ts (server config), codex-client.ts (tool-call + * matching), and structured-output-mcp-server.ts (tool registration) so the + * three stay in sync. + */ + +export const STRUCTURED_OUTPUT_MCP_NAME = "posthog_output"; +export const STRUCTURED_OUTPUT_TOOL_NAME = "create_output"; diff --git a/packages/agent/src/adapters/codex/structured-output-mcp-server.ts b/packages/agent/src/adapters/codex/structured-output-mcp-server.ts index 188f0da63..425b5ae08 100644 --- a/packages/agent/src/adapters/codex/structured-output-mcp-server.ts +++ b/packages/agent/src/adapters/codex/structured-output-mcp-server.ts @@ -2,8 +2,8 @@ * Standalone stdio MCP server for structured output in the Codex adapter. * * Spawned by codex-acp as an MCP server process. Reads the JSON schema - * from the POSTHOG_OUTPUT_SCHEMA env var (base64-encoded), registers a - * `create_output` tool, and validates input with AJV. + * from the POSTHOG_OUTPUT_SCHEMA env var (base64-encoded) and registers + * a tool whose Zod shape McpServer.tool() validates on each call. * * Usage: * POSTHOG_OUTPUT_SCHEMA= node structured-output-mcp-server.js @@ -11,61 +11,52 @@ import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; -import Ajv from "ajv"; import { z } from "zod"; +import { + STRUCTURED_OUTPUT_MCP_NAME, + STRUCTURED_OUTPUT_TOOL_NAME, +} from "./structured-output-constants"; -const OUTPUT_TOOL_NAME = "create_output"; +function die(message: string): never { + process.stderr.write(`[structured-output-mcp-server] ${message}\n`); + process.exit(1); +} const schemaEnv = process.env.POSTHOG_OUTPUT_SCHEMA; if (!schemaEnv) { - process.exit(1); + die("POSTHOG_OUTPUT_SCHEMA env var is required"); } let jsonSchema: Record; try { jsonSchema = JSON.parse(Buffer.from(schemaEnv, "base64").toString("utf-8")); -} catch (_err) { - process.exit(1); +} catch (err) { + die(`Failed to parse POSTHOG_OUTPUT_SCHEMA as base64-encoded JSON: ${err}`); } -const ajv = new Ajv({ allErrors: true }); -const validate = ajv.compile(jsonSchema); - const zodType = z.fromJSONSchema(jsonSchema); if (!(zodType instanceof z.ZodObject)) { - process.exit(1); + die( + `POSTHOG_OUTPUT_SCHEMA must describe a JSON object schema (got ${zodType.constructor.name})`, + ); } // McpServer.tool() expects a mutable ZodRawShape const zodShape = { ...zodType.shape } as z.ZodRawShape; const server = new McpServer({ - name: "posthog_output", + name: STRUCTURED_OUTPUT_MCP_NAME, version: "1.0.0", }); server.tool( - OUTPUT_TOOL_NAME, + STRUCTURED_OUTPUT_TOOL_NAME, "Submit the structured output for this task. Call this tool with the required fields to deliver your final result.", zodShape, - async (args) => { - const valid = validate(args); - if (!valid) { - const errors = validate.errors - ?.map((e) => `${e.instancePath || "/"}: ${e.message}`) - .join("; "); - return { - content: [ - { - type: "text" as const, - text: `Validation failed: ${errors}. Please fix the output and try again.`, - }, - ], - isError: true, - }; - } - - // Output is valid — return success. The parent process captures - // the validated output by intercepting the tool call in the ACP stream. + async () => { + // McpServer.tool() validates `args` against `zodShape` before invoking + // this handler, so reaching this point means the input is valid. The + // parent process captures the validated output by intercepting the + // tool call in the ACP stream. return { content: [ { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3d21f6f35..e86a6a0e4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -684,9 +684,6 @@ importers: '@types/jsonwebtoken': specifier: ^9.0.10 version: 9.0.10 - ajv: - specifier: ^8.17.1 - version: 8.17.1 commander: specifier: ^14.0.2 version: 14.0.3