diff --git a/packages/agent/package.json b/packages/agent/package.json index 47ca765eb..4ffd80ec3 100644 --- a/packages/agent/package.json +++ b/packages/agent/package.json @@ -117,7 +117,6 @@ }, "dependencies": { "@agentclientprotocol/sdk": "0.19.0", - "ajv": "^8.17.1", "@anthropic-ai/claude-agent-sdk": "0.2.112", "@anthropic-ai/sdk": "0.89.0", "@hono/node-server": "^1.19.9", @@ -131,6 +130,7 @@ "hono": "^4.11.7", "jsonwebtoken": "^9.0.2", "minimatch": "^10.0.3", + "@modelcontextprotocol/sdk": "1.29.0", "tar": "^7.5.0", "uuid": "13.0.0", "yoga-wasm-web": "^0.3.3", diff --git a/packages/agent/src/adapters/acp-connection.ts b/packages/agent/src/adapters/acp-connection.ts index 085ca58be..e06e9a507 100644 --- a/packages/agent/src/adapters/acp-connection.ts +++ b/packages/agent/src/adapters/acp-connection.ts @@ -205,6 +205,7 @@ function createCodexConnection(config: AcpConnectionConfig): AcpConnection { codexProcessOptions: config.codexOptions ?? {}, processCallbacks: config.processCallbacks, posthogApiConfig: resolveEnricherApiConfig(config), + onStructuredOutput: config.onStructuredOutput, }); return agent; }, agentStream); diff --git a/packages/agent/src/adapters/codex/codex-agent.test.ts b/packages/agent/src/adapters/codex/codex-agent.test.ts index 559821ffa..092baf030 100644 --- a/packages/agent/src/adapters/codex/codex-agent.test.ts +++ b/packages/agent/src/adapters/codex/codex-agent.test.ts @@ -4,7 +4,7 @@ import type { LoadSessionResponse, NewSessionResponse, } from "@agentclientprotocol/sdk"; -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; const mockCodexConnection = { initialize: vi.fn(), @@ -60,7 +60,12 @@ describe("CodexAcpAgent", () => { vi.clearAllMocks(); }); - function createAgent(overrides: Partial = {}): { + function createAgent( + overrides: Partial = {}, + agentOptions?: { + onStructuredOutput?: (output: Record) => Promise; + }, + ): { agent: CodexAcpAgent; client: AgentSideConnection & { extNotification: ReturnType; @@ -80,6 +85,7 @@ describe("CodexAcpAgent", () => { codexProcessOptions: { cwd: process.cwd(), }, + onStructuredOutput: agentOptions?.onStructuredOutput, }); return { agent, client }; } @@ -295,6 +301,130 @@ describe("CodexAcpAgent", () => { ).resolves.toEqual({ stopReason: "end_turn" }); }); + describe("structured output injection", () => { + const schema = { + type: "object", + properties: { answer: { type: "string" } }, + required: ["answer"], + } as const; + + beforeEach(() => { + // The resolver insists the script path exists. Point at the node + // binary itself — always present, and the agent only forwards the + // path to codex-acp; nothing in this test actually spawns it. + vi.stubEnv("POSTHOG_STRUCTURED_OUTPUT_MCP_SCRIPT", process.execPath); + }); + + afterEach(() => { + vi.unstubAllEnvs(); + }); + + it("injects the create_output MCP server and system-prompt note when jsonSchema and callback are present", async () => { + const { agent } = createAgent({}, { onStructuredOutput: vi.fn() }); + mockCodexConnection.newSession.mockResolvedValue({ + sessionId: "session-1", + modes: { currentModeId: "auto", availableModes: [] }, + configOptions: [], + } satisfies Partial); + + await agent.newSession({ + cwd: process.cwd(), + mcpServers: [{ name: "existing", command: "echo", args: [], env: [] }], + _meta: { jsonSchema: schema, systemPrompt: "be terse." }, + } as never); + + const forwarded = mockCodexConnection.newSession.mock.calls[0][0] as { + mcpServers: Array<{ name: string; command: string; env: unknown }>; + _meta: { systemPrompt: string }; + }; + + // Existing MCP server is preserved; ours is appended. + expect(forwarded.mcpServers).toHaveLength(2); + expect(forwarded.mcpServers[0].name).toBe("existing"); + expect(forwarded.mcpServers[1].name).toBe("posthog_output"); + expect(forwarded.mcpServers[1].command).toBe(process.execPath); + + // The schema is forwarded base64-encoded so codex-acp doesn't have + // to escape it through a shell. + const envEntry = ( + forwarded.mcpServers[1].env as Array<{ name: string; value: string }> + ).find((e) => e.name === "POSTHOG_OUTPUT_SCHEMA"); + expect(envEntry).toBeDefined(); + const decoded = JSON.parse( + Buffer.from(envEntry?.value ?? "", "base64").toString("utf-8"), + ); + expect(decoded).toEqual(schema); + + // Existing systemPrompt is preserved with the structured-output + // instruction appended (not overwritten). + expect(forwarded._meta.systemPrompt.startsWith("be terse.")).toBe(true); + expect(forwarded._meta.systemPrompt).toContain("create_output"); + }); + + it("is a no-op when jsonSchema is absent", async () => { + const { agent } = createAgent({}, { onStructuredOutput: vi.fn() }); + mockCodexConnection.newSession.mockResolvedValue({ + sessionId: "session-1", + modes: { currentModeId: "auto", availableModes: [] }, + configOptions: [], + } satisfies Partial); + + await agent.newSession({ + cwd: process.cwd(), + mcpServers: [], + } as never); + + const forwarded = mockCodexConnection.newSession.mock.calls[0][0] as { + mcpServers: unknown[]; + _meta?: { systemPrompt?: string }; + }; + expect(forwarded.mcpServers).toEqual([]); + expect(forwarded._meta?.systemPrompt).toBeUndefined(); + }); + + it("is a no-op when onStructuredOutput callback is not wired", async () => { + const { agent } = createAgent(); + mockCodexConnection.newSession.mockResolvedValue({ + sessionId: "session-1", + modes: { currentModeId: "auto", availableModes: [] }, + configOptions: [], + } satisfies Partial); + + await agent.newSession({ + cwd: process.cwd(), + mcpServers: [], + _meta: { jsonSchema: schema }, + } as never); + + const forwarded = mockCodexConnection.newSession.mock.calls[0][0] as { + mcpServers: unknown[]; + }; + expect(forwarded.mcpServers).toEqual([]); + }); + + it("also injects on loadSession", async () => { + const { agent } = createAgent({}, { onStructuredOutput: vi.fn() }); + mockCodexConnection.loadSession.mockResolvedValue({ + modes: { currentModeId: "auto", availableModes: [] }, + configOptions: [], + } satisfies Partial); + + await agent.loadSession({ + sessionId: "session-1", + cwd: process.cwd(), + mcpServers: [], + _meta: { jsonSchema: schema }, + } as never); + + const forwarded = mockCodexConnection.loadSession.mock.calls[0][0] as { + mcpServers: Array<{ name: string }>; + }; + expect(forwarded.mcpServers.map((s) => s.name)).toContain( + "posthog_output", + ); + }); + }); + it("broadcasts user prompt as user_message_chunk before delegating to codex-acp", async () => { const { agent, client } = createAgent(); // Seed an active session so prompt() has the state it expects. diff --git a/packages/agent/src/adapters/codex/codex-agent.ts b/packages/agent/src/adapters/codex/codex-agent.ts index 9ed83ec57..0be25e1e7 100644 --- a/packages/agent/src/adapters/codex/codex-agent.ts +++ b/packages/agent/src/adapters/codex/codex-agent.ts @@ -9,6 +9,8 @@ * - System prompt injection */ +import { existsSync } from "node:fs"; +import { resolve as resolvePath } from "node:path"; import { type AgentSideConnection, type AuthenticateRequest, @@ -22,6 +24,7 @@ import { type LoadSessionRequest, type LoadSessionResponse, type McpServer, + type McpServerStdio, type NewSessionRequest, type NewSessionResponse, ndJsonStream, @@ -71,6 +74,15 @@ import { type CodexProcessOptions, spawnCodexProcess, } from "./spawn"; +import { + STRUCTURED_OUTPUT_MCP_NAME, + STRUCTURED_OUTPUT_TOOL_NAME, +} from "./structured-output-constants"; + +export { + STRUCTURED_OUTPUT_MCP_NAME, + STRUCTURED_OUTPUT_TOOL_NAME, +} from "./structured-output-constants"; interface NewSessionMeta { taskRunId?: string; @@ -85,12 +97,14 @@ interface NewSessionMeta { additionalRoots?: string[]; disableBuiltInTools?: boolean; allowedDomains?: string[]; + jsonSchema?: Record | null; } export interface CodexAcpAgentOptions { codexProcessOptions: CodexProcessOptions; processCallbacks?: ProcessSpawnedCallback; posthogApiConfig?: PostHogAPIConfig; + onStructuredOutput?: (output: Record) => Promise; } type CodexSession = BaseSession & { @@ -152,6 +166,49 @@ function getCurrentPermissionMode( return toCodexPermissionMode(fallbackMode); } +const STRUCTURED_OUTPUT_INSTRUCTIONS = `\n\nWhen you have completed the task, call the \`${STRUCTURED_OUTPUT_TOOL_NAME}\` tool with the final structured result. The tool's input schema matches the required output format for this task. Do not describe the result in a plain message — submitting it via the tool is required for the task to be considered complete.`; + +/** + * Builds the stdio MCP server config that exposes the `create_output` tool. + * The child process validates tool input against the JSON schema with AJV. + * We pass the schema as a base64-encoded env var to avoid shell escaping. + * + * Path resolves relative to the compiled adapter location. When bundled into + * different entry points (dist/agent.js, dist/server/bin.cjs, dist/server/ + * harness/bin.js, etc), `import.meta.dirname` sits at different depths. Walk + * up until we find the script so each bundle locates the shared dist asset. + */ +function resolveStructuredOutputMcpScript(): string { + const override = process.env.POSTHOG_STRUCTURED_OUTPUT_MCP_SCRIPT; + if (override && existsSync(override)) return override; + + const rel = "adapters/codex/structured-output-mcp-server.js"; + let dir = import.meta.dirname ?? __dirname; + for (let i = 0; i < 5; i++) { + const candidate = resolvePath(dir, rel); + if (existsSync(candidate)) return candidate; + dir = resolvePath(dir, ".."); + } + throw new Error( + `Could not locate ${rel} relative to ${import.meta.dirname ?? __dirname}. Set POSTHOG_STRUCTURED_OUTPUT_MCP_SCRIPT to override.`, + ); +} + +function buildStructuredOutputMcpServer( + jsonSchema: Record, +): McpServerStdio { + const scriptPath = resolveStructuredOutputMcpScript(); + const schemaBase64 = Buffer.from(JSON.stringify(jsonSchema)).toString( + "base64", + ); + return { + name: STRUCTURED_OUTPUT_MCP_NAME, + command: process.execPath, + args: [scriptPath], + env: [{ name: "POSTHOG_OUTPUT_SCHEMA", value: schemaBase64 }], + }; +} + export class CodexAcpAgent extends BaseAcpAgent { readonly adapterName = "codex"; declare session: CodexSession; @@ -171,6 +228,9 @@ export class CodexAcpAgent extends BaseAcpAgent { private promptMutex: Promise = Promise.resolve(); private readonly codexProcessOptions: CodexProcessOptions; private readonly processCallbacks?: ProcessSpawnedCallback; + private readonly onStructuredOutput?: ( + output: Record, + ) => Promise; // Snapshot of the initialize() request so refreshSession can replay the // same handshake against a respawned codex-acp subprocess. private lastInitRequest?: InitializeRequest; @@ -187,6 +247,7 @@ export class CodexAcpAgent extends BaseAcpAgent { this.codexProcessOptions = options.codexProcessOptions; this.processCallbacks = options.processCallbacks; + this.onStructuredOutput = options.onStructuredOutput; // Spawn the codex-acp subprocess this.codexProcess = spawnCodexProcess({ @@ -221,6 +282,7 @@ export class CodexAcpAgent extends BaseAcpAgent { (_agent) => createCodexClient(this.client, this.logger, this.sessionState, { enrichmentDeps: this.enrichment?.deps, + onStructuredOutput: this.onStructuredOutput, }), codexStream, ); @@ -264,7 +326,8 @@ export class CodexAcpAgent extends BaseAcpAgent { const meta = params._meta as NewSessionMeta | undefined; const requestedPermissionMode = toCodexPermissionMode(meta?.permissionMode); - const response = await this.codexConnection.newSession(params); + const injectedParams = this.applyStructuredOutput(params, meta); + const response = await this.codexConnection.newSession(injectedParams); // Initialize session state this.sessionState = createSessionState(response.sessionId, params.cwd, { @@ -301,8 +364,9 @@ export class CodexAcpAgent extends BaseAcpAgent { } async loadSession(params: LoadSessionRequest): Promise { - const response = await this.codexConnection.loadSession(params); const meta = params._meta as NewSessionMeta | undefined; + const injectedParams = this.applyStructuredOutput(params, meta); + const response = await this.codexConnection.loadSession(injectedParams); const currentPermissionMode = getCurrentPermissionMode( response.modes?.currentModeId, meta?.permissionMode, @@ -335,14 +399,19 @@ export class CodexAcpAgent extends BaseAcpAgent { async unstable_resumeSession( params: ResumeSessionRequest, ): Promise { - // codex-acp doesn't support resume natively, use loadSession instead - const loadResponse = await this.codexConnection.loadSession({ - sessionId: params.sessionId, - cwd: params.cwd, - mcpServers: params.mcpServers ?? [], - }); - const meta = params._meta as NewSessionMeta | undefined; + const injectedParams = this.applyStructuredOutput( + { + sessionId: params.sessionId, + cwd: params.cwd, + mcpServers: params.mcpServers ?? [], + _meta: params._meta, + }, + meta, + ); + + // codex-acp doesn't support resume natively, use loadSession instead + const loadResponse = await this.codexConnection.loadSession(injectedParams); const currentPermissionMode = getCurrentPermissionMode( loadResponse.modes?.currentModeId, meta?.permissionMode, @@ -374,14 +443,19 @@ export class CodexAcpAgent extends BaseAcpAgent { async unstable_forkSession( params: ForkSessionRequest, ): Promise { + const meta = params._meta as NewSessionMeta | undefined; + const injectedParams = this.applyStructuredOutput( + { + cwd: params.cwd, + mcpServers: params.mcpServers ?? [], + _meta: params._meta, + }, + meta, + ); + // Create a new session via codex-acp (fork isn't natively supported) - const newResponse = await this.codexConnection.newSession({ - cwd: params.cwd, - mcpServers: params.mcpServers ?? [], - _meta: params._meta, - }); + const newResponse = await this.codexConnection.newSession(injectedParams); - const meta = params._meta as NewSessionMeta | undefined; const requestedPermissionMode = toCodexPermissionMode(meta?.permissionMode); this.sessionState = createSessionState(newResponse.sessionId, params.cwd, { taskRunId: meta?.taskRunId, @@ -401,6 +475,38 @@ export class CodexAcpAgent extends BaseAcpAgent { return newResponse; } + /** + * When the caller wires up `onStructuredOutput` and provides a JSON schema + * via `_meta.jsonSchema`, inject the stdio MCP server that exposes + * `create_output` and append instructions telling the model to use it. + * + * Codex has no native equivalent of Claude's `outputFormat`, so we lean on + * MCP tool-calling to get validated structured output back. + */ + private applyStructuredOutput< + T extends { mcpServers?: McpServer[]; _meta?: unknown }, + >(request: T, meta: NewSessionMeta | undefined): T { + if (!meta?.jsonSchema || !this.onStructuredOutput) { + return request; + } + + const mcpServer = buildStructuredOutputMcpServer(meta.jsonSchema); + const existingMeta = (request._meta ?? {}) as Record; + const existingSystemPrompt = + typeof existingMeta.systemPrompt === "string" + ? existingMeta.systemPrompt + : ""; + + return { + ...request, + mcpServers: [...(request.mcpServers ?? []), mcpServer], + _meta: { + ...existingMeta, + systemPrompt: existingSystemPrompt + STRUCTURED_OUTPUT_INSTRUCTIONS, + }, + }; + } + private async applyInitialPermissionMode( sessionId: string, permissionMode?: string, @@ -617,7 +723,9 @@ export class CodexAcpAgent extends BaseAcpAgent { const newAbortController = new AbortController(); const newConnection = new ClientSideConnection( (_agent) => - createCodexClient(this.client, this.logger, this.sessionState), + createCodexClient(this.client, this.logger, this.sessionState, { + onStructuredOutput: this.onStructuredOutput, + }), codexStream, ); diff --git a/packages/agent/src/adapters/codex/codex-client.test.ts b/packages/agent/src/adapters/codex/codex-client.test.ts index c012a2b4b..68ea50ad7 100644 --- a/packages/agent/src/adapters/codex/codex-client.test.ts +++ b/packages/agent/src/adapters/codex/codex-client.test.ts @@ -2,6 +2,7 @@ import type { AgentSideConnection, ReadTextFileRequest, ReadTextFileResponse, + SessionNotification, } from "@agentclientprotocol/sdk"; import { describe, expect, test, vi } from "vitest"; import type { FileEnrichmentDeps } from "../../enrichment/file-enricher"; @@ -110,3 +111,180 @@ describe("createCodexClient readTextFile", () => { expect(upstream.readTextFile).toHaveBeenCalledWith(params); }); }); + +describe("createCodexClient onStructuredOutput", () => { + const logger = new Logger({ debug: false, prefix: "[test]" }); + const sessionState = createSessionState("sess", "/tmp"); + + function makeUpstream(): AgentSideConnection { + return { + sessionUpdate: vi.fn(async () => {}), + requestPermission: vi.fn(), + readTextFile: vi.fn(), + writeTextFile: vi.fn(), + createTerminal: vi.fn(), + terminalOutput: vi.fn(), + releaseTerminal: vi.fn(), + waitForTerminalExit: vi.fn(), + killTerminal: vi.fn(), + extMethod: vi.fn(), + extNotification: vi.fn(), + } as unknown as AgentSideConnection; + } + + function notification(update: Record): SessionNotification { + return { + sessionId: "sess", + update, + } as unknown as SessionNotification; + } + + test("fires once when create_output completes after rawInput arrived", async () => { + const onStructuredOutput = vi.fn(async () => {}); + const upstream = makeUpstream(); + const client = createCodexClient(upstream, logger, sessionState, { + onStructuredOutput, + }); + + await client.sessionUpdate?.( + notification({ + sessionUpdate: "tool_call", + toolCallId: "tc-1", + title: "create_output", + status: "in_progress", + rawInput: { result: "ok", count: 5 }, + }), + ); + expect(onStructuredOutput).not.toHaveBeenCalled(); + + await client.sessionUpdate?.( + notification({ + sessionUpdate: "tool_call_update", + toolCallId: "tc-1", + title: "create_output", + status: "completed", + }), + ); + + expect(onStructuredOutput).toHaveBeenCalledTimes(1); + expect(onStructuredOutput).toHaveBeenCalledWith({ result: "ok", count: 5 }); + }); + + test("matches mcp__-prefixed tool titles", async () => { + const onStructuredOutput = vi.fn(async () => {}); + const upstream = makeUpstream(); + const client = createCodexClient(upstream, logger, sessionState, { + onStructuredOutput, + }); + + await client.sessionUpdate?.( + notification({ + sessionUpdate: "tool_call", + toolCallId: "tc-1", + title: "mcp__posthog_output__create_output", + status: "completed", + rawInput: { ok: true }, + }), + ); + + expect(onStructuredOutput).toHaveBeenCalledWith({ ok: true }); + }); + + test("ignores tool calls that aren't create_output", async () => { + const onStructuredOutput = vi.fn(async () => {}); + const upstream = makeUpstream(); + const client = createCodexClient(upstream, logger, sessionState, { + onStructuredOutput, + }); + + await client.sessionUpdate?.( + notification({ + sessionUpdate: "tool_call", + toolCallId: "tc-1", + title: "Read", + status: "completed", + rawInput: { path: "/tmp/x" }, + }), + ); + + expect(onStructuredOutput).not.toHaveBeenCalled(); + }); + + test("does not fire when rawInput never arrived", async () => { + const onStructuredOutput = vi.fn(async () => {}); + const upstream = makeUpstream(); + const client = createCodexClient(upstream, logger, sessionState, { + onStructuredOutput, + }); + + await client.sessionUpdate?.( + notification({ + sessionUpdate: "tool_call", + toolCallId: "tc-1", + title: "create_output", + status: "completed", + }), + ); + + expect(onStructuredOutput).not.toHaveBeenCalled(); + }); + + test("does not fire twice if completed is re-emitted for the same tool call", async () => { + const onStructuredOutput = vi.fn(async () => {}); + const upstream = makeUpstream(); + const client = createCodexClient(upstream, logger, sessionState, { + onStructuredOutput, + }); + + const completed = notification({ + sessionUpdate: "tool_call", + toolCallId: "tc-1", + title: "create_output", + status: "completed", + rawInput: { final: 1 }, + }); + + await client.sessionUpdate?.(completed); + await client.sessionUpdate?.(completed); + + expect(onStructuredOutput).toHaveBeenCalledTimes(1); + }); + + test("forwards the notification upstream regardless of structured-output handling", async () => { + const onStructuredOutput = vi.fn(async () => {}); + const upstream = makeUpstream(); + const client = createCodexClient(upstream, logger, sessionState, { + onStructuredOutput, + }); + + const note = notification({ + sessionUpdate: "tool_call", + toolCallId: "tc-1", + title: "create_output", + status: "completed", + rawInput: { final: 1 }, + }); + await client.sessionUpdate?.(note); + + expect(upstream.sessionUpdate).toHaveBeenCalledWith(note); + }); + + test("does nothing when the callback is not wired", async () => { + const upstream = makeUpstream(); + const client = createCodexClient(upstream, logger, sessionState); + + // No onStructuredOutput configured — must not throw and must still + // forward upstream. + await client.sessionUpdate?.( + notification({ + sessionUpdate: "tool_call", + toolCallId: "tc-1", + title: "create_output", + status: "completed", + rawInput: { x: 1 }, + }), + ); + + expect(upstream.sessionUpdate).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/agent/src/adapters/codex/codex-client.ts b/packages/agent/src/adapters/codex/codex-client.ts index 727d8876b..ebe05a4fe 100644 --- a/packages/agent/src/adapters/codex/codex-client.ts +++ b/packages/agent/src/adapters/codex/codex-client.ts @@ -36,12 +36,45 @@ import { import type { PermissionMode } from "../../execution-mode"; import type { Logger } from "../../utils/logger"; import type { CodexSessionState } from "./session-state"; +import { + STRUCTURED_OUTPUT_MCP_NAME, + STRUCTURED_OUTPUT_TOOL_NAME, +} from "./structured-output-constants"; export interface CodexClientCallbacks { /** Called when a usage_update session notification is received */ onUsageUpdate?: (update: Record) => void; /** When set, Read responses are annotated with PostHog enrichment before reaching codex-acp. */ enrichmentDeps?: FileEnrichmentDeps; + /** + * Called once per session when the agent completes the injected + * `create_output` MCP tool. Matches the Claude adapter's structured + * output delivery. + */ + onStructuredOutput?: (output: Record) => Promise; +} + +/** + * Tool calls for our injected MCP server surface in ACP `tool_call` / + * `tool_call_update` notifications. The `title` from codex-acp can be + * either the bare tool name or prefixed (`mcp____`); match + * both forms but require the server name on prefixed titles so an unrelated + * user tool happening to contain `create_output` doesn't trigger us. + */ +function isStructuredOutputToolCall(title: string | undefined | null): boolean { + if (!title) return false; + if (title === STRUCTURED_OUTPUT_TOOL_NAME) return true; + return ( + title.includes(STRUCTURED_OUTPUT_MCP_NAME) && + title.includes(STRUCTURED_OUTPUT_TOOL_NAME) + ); +} + +function toRecord(value: unknown): Record | null { + if (value && typeof value === "object" && !Array.isArray(value)) { + return value as Record; + } + return null; } const AUTO_APPROVED_KINDS: Record> = { @@ -96,6 +129,14 @@ export function createCodexClient( callbacks?: CodexClientCallbacks, ): Client { const terminalHandles = new Map(); + // Track rawInput across tool_call → tool_call_update → completed so we can + // fire onStructuredOutput exactly once per tool call id. Entries stay in + // the map after firing with `fired: true` so a re-emitted completion + // (if codex-acp ever resends one) is a no-op. + const structuredOutputState = new Map< + string, + { rawInput?: Record; fired: boolean } + >(); return { async requestPermission( @@ -125,6 +166,33 @@ export function createCodexClient( async sessionUpdate(params: SessionNotification): Promise { const update = params.update as Record | undefined; + + if ( + callbacks?.onStructuredOutput && + (update?.sessionUpdate === "tool_call" || + update?.sessionUpdate === "tool_call_update") + ) { + const toolCallId = update.toolCallId as string | undefined; + const title = update.title as string | undefined; + if (toolCallId && isStructuredOutputToolCall(title)) { + const entry = structuredOutputState.get(toolCallId) ?? { + fired: false, + }; + const rawInput = toRecord(update.rawInput); + if (rawInput) entry.rawInput = rawInput; + structuredOutputState.set(toolCallId, entry); + + if (update.status === "completed" && !entry.fired && entry.rawInput) { + entry.fired = true; + try { + await callbacks.onStructuredOutput(entry.rawInput); + } catch (err) { + logger.warn("onStructuredOutput callback threw", { error: err }); + } + } + } + } + if (update?.sessionUpdate === "usage_update") { const used = update.used as number | undefined; const size = update.size as number | undefined; diff --git a/packages/agent/src/adapters/codex/structured-output-constants.ts b/packages/agent/src/adapters/codex/structured-output-constants.ts new file mode 100644 index 000000000..739e2a775 --- /dev/null +++ b/packages/agent/src/adapters/codex/structured-output-constants.ts @@ -0,0 +1,9 @@ +/** + * Shared identifiers for the injected structured-output MCP server. + * Imported by codex-agent.ts (server config), codex-client.ts (tool-call + * matching), and structured-output-mcp-server.ts (tool registration) so the + * three stay in sync. + */ + +export const STRUCTURED_OUTPUT_MCP_NAME = "posthog_output"; +export const STRUCTURED_OUTPUT_TOOL_NAME = "create_output"; diff --git a/packages/agent/src/adapters/codex/structured-output-mcp-server.ts b/packages/agent/src/adapters/codex/structured-output-mcp-server.ts new file mode 100644 index 000000000..425b5ae08 --- /dev/null +++ b/packages/agent/src/adapters/codex/structured-output-mcp-server.ts @@ -0,0 +1,72 @@ +/** + * Standalone stdio MCP server for structured output in the Codex adapter. + * + * Spawned by codex-acp as an MCP server process. Reads the JSON schema + * from the POSTHOG_OUTPUT_SCHEMA env var (base64-encoded) and registers + * a tool whose Zod shape McpServer.tool() validates on each call. + * + * Usage: + * POSTHOG_OUTPUT_SCHEMA= node structured-output-mcp-server.js + */ + +import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; +import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; +import { z } from "zod"; +import { + STRUCTURED_OUTPUT_MCP_NAME, + STRUCTURED_OUTPUT_TOOL_NAME, +} from "./structured-output-constants"; + +function die(message: string): never { + process.stderr.write(`[structured-output-mcp-server] ${message}\n`); + process.exit(1); +} + +const schemaEnv = process.env.POSTHOG_OUTPUT_SCHEMA; +if (!schemaEnv) { + die("POSTHOG_OUTPUT_SCHEMA env var is required"); +} + +let jsonSchema: Record; +try { + jsonSchema = JSON.parse(Buffer.from(schemaEnv, "base64").toString("utf-8")); +} catch (err) { + die(`Failed to parse POSTHOG_OUTPUT_SCHEMA as base64-encoded JSON: ${err}`); +} + +const zodType = z.fromJSONSchema(jsonSchema); +if (!(zodType instanceof z.ZodObject)) { + die( + `POSTHOG_OUTPUT_SCHEMA must describe a JSON object schema (got ${zodType.constructor.name})`, + ); +} +// McpServer.tool() expects a mutable ZodRawShape +const zodShape = { ...zodType.shape } as z.ZodRawShape; + +const server = new McpServer({ + name: STRUCTURED_OUTPUT_MCP_NAME, + version: "1.0.0", +}); + +server.tool( + STRUCTURED_OUTPUT_TOOL_NAME, + "Submit the structured output for this task. Call this tool with the required fields to deliver your final result.", + zodShape, + async () => { + // McpServer.tool() validates `args` against `zodShape` before invoking + // this handler, so reaching this point means the input is valid. The + // parent process captures the validated output by intercepting the + // tool call in the ACP stream. + return { + content: [ + { + type: "text" as const, + text: "Output submitted successfully.", + }, + ], + }; + }, +); + +const transport = new StdioServerTransport(); +await server.connect(transport); diff --git a/packages/agent/tsup.config.ts b/packages/agent/tsup.config.ts index a21a206b5..130344cea 100644 --- a/packages/agent/tsup.config.ts +++ b/packages/agent/tsup.config.ts @@ -86,6 +86,7 @@ export default defineConfig([ "src/adapters/claude/session/models.ts", "src/adapters/codex/models.ts", "src/adapters/claude/mcp/tool-metadata.ts", + "src/adapters/codex/structured-output-mcp-server.ts", "src/adapters/reasoning-effort.ts", "src/execution-mode.ts", "src/server/schemas.ts", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index cf9657e97..e86a6a0e4 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -261,7 +261,7 @@ importers: version: 17.2.3 drizzle-orm: specifier: ^0.45.1 - version: 0.45.1(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(better-sqlite3@12.8.0)(bun-types@1.3.12) + version: 0.45.1(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(better-sqlite3@12.8.0)(bun-types@1.3.13) electron-log: specifier: ^5.4.3 version: 5.4.3 @@ -663,6 +663,9 @@ importers: '@hono/node-server': specifier: ^1.19.9 version: 1.19.9(hono@4.11.7) + '@modelcontextprotocol/sdk': + specifier: 1.29.0 + version: 1.29.0(zod@4.3.6) '@opentelemetry/api-logs': specifier: ^0.208.0 version: 0.208.0 @@ -681,9 +684,6 @@ importers: '@types/jsonwebtoken': specifier: ^9.0.10 version: 9.0.10 - ajv: - specifier: ^8.17.1 - version: 8.17.1 commander: specifier: ^14.0.2 version: 14.0.3 @@ -720,7 +720,7 @@ importers: version: link:../shared '@types/bun': specifier: latest - version: 1.3.12 + version: 1.3.13 '@types/tar': specifier: ^6.1.13 version: 6.1.13 @@ -5184,8 +5184,8 @@ packages: '@types/better-sqlite3@7.6.13': resolution: {integrity: sha512-NMv9ASNARoKksWtsq/SHakpYAYnhBrQgGD8zkLYk/jaK8jUGn08CfEdTRgYhMypUQAfzSP8W6gNLe0q19/t4VA==} - '@types/bun@1.3.12': - resolution: {integrity: sha512-DBv81elK+/VSwXHDlnH3Qduw+KxkTIWi7TXkAeh24zpi5l0B2kUg9Ga3tb4nJaPcOFswflgi/yAvMVBPrxMB+A==} + '@types/bun@1.3.13': + resolution: {integrity: sha512-9fqXWk5YIHGGnUau9TEi+qdlTYDAnOj+xLCmSTwXfAIqXr2x4tytJb43E9uCvt09zJURKXwAtkoH4nLQfzeTXw==} '@types/cacheable-request@6.0.3': resolution: {integrity: sha512-IQ3EbTzGxIigb1I3qPZc1rWJnH0BmSKv5QYTalEwweFvyBDLSAe24zP0le/hyi7ecGfZVlIVAg4BZqb8WBwKqw==} @@ -5916,8 +5916,8 @@ packages: buffer@6.0.3: resolution: {integrity: sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==} - bun-types@1.3.12: - resolution: {integrity: sha512-HqOLj5PoFajAQciOMRiIZGNoKxDJSr6qigAttOX40vJuSp6DN/CxWp9s3C1Xwm4oH7ybueITwiaOcWXoYVoRkA==} + bun-types@1.3.13: + resolution: {integrity: sha512-QXKeHLlOLqQX9LgYaHJfzdBaV21T63HhFJnvuRCcjZiaUDpbs5ED1MgxbMra71CsryN/1dAoXuJJJwIv/2drVA==} bundle-name@4.1.0: resolution: {integrity: sha512-tjwM5exMg6BGRI+kNmTntNsvdZS1X8BFYS6tnJ2hdH0kVxM6/eVZ2xy+FqStSWvYmtfFMDLIxurorHwDKfDz5Q==} @@ -14779,7 +14779,7 @@ snapshots: react: 19.1.0 react-dom: 19.1.0(react@19.1.0) - '@modelcontextprotocol/sdk@1.27.1(zod@3.25.76)': + '@modelcontextprotocol/sdk@1.27.1(zod@4.3.6)': dependencies: '@hono/node-server': 1.19.9(hono@4.11.7) ajv: 8.17.1 @@ -14796,12 +14796,12 @@ snapshots: json-schema-typed: 8.0.2 pkce-challenge: 5.0.1 raw-body: 3.0.2 - zod: 3.25.76 - zod-to-json-schema: 3.25.1(zod@3.25.76) + zod: 4.3.6 + zod-to-json-schema: 3.25.1(zod@4.3.6) transitivePeerDependencies: - supports-color - '@modelcontextprotocol/sdk@1.27.1(zod@4.3.6)': + '@modelcontextprotocol/sdk@1.29.0(zod@3.25.76)': dependencies: '@hono/node-server': 1.19.9(hono@4.11.7) ajv: 8.17.1 @@ -14818,8 +14818,8 @@ snapshots: json-schema-typed: 8.0.2 pkce-challenge: 5.0.1 raw-body: 3.0.2 - zod: 4.3.6 - zod-to-json-schema: 3.25.1(zod@4.3.6) + zod: 3.25.76 + zod-to-json-schema: 3.25.1(zod@3.25.76) transitivePeerDependencies: - supports-color @@ -16906,9 +16906,9 @@ snapshots: dependencies: '@types/node': 24.12.0 - '@types/bun@1.3.12': + '@types/bun@1.3.13': dependencies: - bun-types: 1.3.12 + bun-types: 1.3.13 '@types/cacheable-request@6.0.3': dependencies: @@ -17788,7 +17788,7 @@ snapshots: base64-js: 1.5.1 ieee754: 1.2.1 - bun-types@1.3.12: + bun-types@1.3.13: dependencies: '@types/node': 24.12.0 @@ -18387,12 +18387,12 @@ snapshots: transitivePeerDependencies: - supports-color - drizzle-orm@0.45.1(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(better-sqlite3@12.8.0)(bun-types@1.3.12): + drizzle-orm@0.45.1(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(better-sqlite3@12.8.0)(bun-types@1.3.13): optionalDependencies: '@opentelemetry/api': 1.9.0 '@types/better-sqlite3': 7.6.13 better-sqlite3: 12.8.0 - bun-types: 1.3.12 + bun-types: 1.3.13 ds-store@0.1.6: dependencies: @@ -23012,7 +23012,7 @@ snapshots: '@babel/plugin-transform-typescript': 7.28.6(@babel/core@7.29.0) '@babel/preset-typescript': 7.28.5(@babel/core@7.29.0) '@dotenvx/dotenvx': 1.60.1 - '@modelcontextprotocol/sdk': 1.27.1(zod@3.25.76) + '@modelcontextprotocol/sdk': 1.29.0(zod@3.25.76) '@types/validate-npm-package-name': 4.0.2 browserslist: 4.28.1 commander: 14.0.3