From 6f7cf13ab9e1136185cd89570ada7f73bfe450ad Mon Sep 17 00:00:00 2001 From: Drew Stone Date: Thu, 14 May 2026 05:35:24 -0600 Subject: [PATCH] fix(trace-store): merge _update span patches on FileSystemTraceStore.load MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a fresh FileSystemTraceStore opens a directory written by a prior process, load() calls appendSpan() for every NDJSON row — including the { ...patch, _update: true } fragments that updateSpan writes. Those fragments lack kind / runId / name / model, so the reader ended up with two spans per id (full row + fragment), breaking any consumer that re-opens the store cross-process (e.g. tax-agent's canonical eval OTLP converter, which then reported 0 spans to the trace analyst). The runs branch already handled this via appendRun-then-updateRun fallback. Mirror the same pattern for spans: detect \_update and route to updateSpan, merging into the prior span. New regression test reads the dir from a second FileSystemTraceStore instance and asserts the patch fields are applied without losing kind / runId / name. --- src/trace/store.ts | 20 +++++++++++++++++++- tests/trace-store.test.ts | 31 +++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/src/trace/store.ts b/src/trace/store.ts index 72dab0f..6349621 100644 --- a/src/trace/store.ts +++ b/src/trace/store.ts @@ -268,7 +268,25 @@ export class FileSystemTraceStore implements TraceStore { await store.updateRun(record.runId, record) } } else if (base === 'spans') { - await store.appendSpan(record) + // `updateSpan` appends an `_update: true` patch row instead of + // rewriting the original span. On reload we must collapse those + // patches into the prior span — otherwise a fresh + // FileSystemTraceStore reading the same dir reports duplicate + // spans (one full, one fragment with no runId/kind/name), which + // breaks any downstream consumer that re-opens the store + // cross-process (e.g. the canonical eval's OTLP converter). + if (record?._update) { + try { + await store.updateSpan(record.spanId, record) + } catch { + // Patch row arrived before the original — should not happen + // with locked append order, but fall through to append so we + // don't lose data. + await store.appendSpan(record) + } + } else { + await store.appendSpan(record) + } } else if (base === 'events') { await store.appendEvent(record) } else if (base === 'artifacts') { diff --git a/tests/trace-store.test.ts b/tests/trace-store.test.ts index e65ba41..ca227eb 100644 --- a/tests/trace-store.test.ts +++ b/tests/trace-store.test.ts @@ -139,6 +139,37 @@ describe('FileSystemTraceStore', () => { expect(spans).toHaveLength(1) expect(spans[0].endedAt).toBe(2) }) + + it('cross-process reload merges _update span patches into the original span — regression: a fresh FileSystemTraceStore opened on a written dir reported each span twice (the original row + the patch fragment with no runId/kind/name), which broke OTLP conversion downstream (canonical eval analyst saw 0 spans)', async () => { + const dir = await makeDir() + + // First process: write run, append a full LLM span, then patch it via updateSpan. + const writer = new FileSystemTraceStore({ dir }) + await writer.appendRun(makeRun('r1')) + await writer.appendSpan({ + runId: 'r1', + spanId: 's1', + kind: 'llm', + model: 'claude', + messages: [{ role: 'user', content: 'hi' }], + name: 'turn-1', + startedAt: 1, + }) + await writer.updateSpan('s1', { endedAt: 5, status: 'ok', output: 'merged-output' } as Partial) + + // Second process: fresh store reads the dir from scratch (forces load()). + const reader = new FileSystemTraceStore({ dir }) + const spans = await reader.spans({ runId: 'r1' }) + expect(spans).toHaveLength(1) + const [s] = spans + expect(s.spanId).toBe('s1') + expect(s.runId).toBe('r1') // must survive the merge (patch row has no runId) + expect(s.kind).toBe('llm') // must survive the merge (patch row has no kind) + expect(s.name).toBe('turn-1') // must survive the merge (patch row has no name) + expect(s.endedAt).toBe(5) // applied from patch + expect(s.status).toBe('ok') // applied from patch + expect((s as { output?: string }).output).toBe('merged-output') + }) }) describe('TraceEmitter', () => {