From 56cc509a408126878d0621b0195e9e969924173d Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Wed, 3 Jun 2026 15:17:40 +0200 Subject: [PATCH 1/7] fix(workflow-executor): drop dead-branch steps from context (PRD-433) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Revising a step forks the run history: the orchestrator stamps the pivot card revised and the downstream branch cancelled, then re-appends live clones. The executor read that history branch-blind, so re-executed steps inherited stale AI context and the record pool offered records loaded by superseded steps (e.g. revising "Load store" proposed loading from the dead branch's owner). previousSteps now mirrors the orchestrator's own live-path filter, and each step carries lineageStepIndexes so executions persisted in the RunStore under a clone's original index resolve to the freshest generation — never to a dead branch, never by stepName (LinkTo loops make names non-unique on the live path). Co-Authored-By: Claude Opus 4.8 --- packages/workflow-executor/CLAUDE.md | 1 + .../adapters/run-to-available-step-mapper.ts | 25 +- .../src/adapters/server-types.ts | 2 + .../src/executors/base-step-executor.ts | 24 +- .../src/executors/record-step-executor.ts | 18 +- .../src/types/validated/execution.ts | 5 + .../run-to-available-step-mapper.test.ts | 183 ++++++++++++ .../test/executors/base-step-executor.test.ts | 120 ++++++++ .../load-related-record-step-executor.test.ts | 82 +++++- .../read-record-step-executor.test.ts | 271 +++++++++++++++++- ...rigger-record-action-step-executor.test.ts | 27 +- .../update-record-step-executor.test.ts | 26 +- 12 files changed, 764 insertions(+), 20 deletions(-) diff --git a/packages/workflow-executor/CLAUDE.md b/packages/workflow-executor/CLAUDE.md index 9a3dabe201..11ae0bc427 100644 --- a/packages/workflow-executor/CLAUDE.md +++ b/packages/workflow-executor/CLAUDE.md @@ -94,6 +94,7 @@ src/ - **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded values (display names) directly instead of invoking the AI. Each record step type has its own typed `preRecordedArgs` shape. Validation happens via schema resolution — invalid display names throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI. - **Graceful shutdown** — `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class. - **Structured log context** — `BaseStepExecutor.execute()` stamps every log line with a shared `logCtx` (`runId`, `stepId`, `stepIndex`, `stepType`). Executors with type-specific identifiers add them via the `getExtraLogContext()` hook (default `{}`), keeping the base class free of step-specific knowledge — e.g. `McpStepExecutor` returns `{ mcpServerId, mcpServerName }` so MCP step logs unambiguously identify the targeted server (`mcpServerId` is canonical; `mcpServerName` is the human-readable Record key, not guaranteed unique at the DB level). `mcpServerName` is resolved by `RemoteToolFetcher.fetch()` from the scoped config Record key and forwarded to the executor constructor. +- **Revision-aware history reads** — When a user revises a step, the orchestrator stamps the pivot card `revised: true` and every later entry `cancelled: true`, then appends clones of the still-valid card sub-steps (with `originalStepIndex` chaining to the FIRST original) plus the re-executed step. The live path is `filter(!revised && !cancelled)` — `toPreviousSteps` applies it, mirroring the orchestrator's own reads. Because clones run under a new `stepIndex` while their RunStore execution data stays keyed under the original one, each previous step carries `lineageStepIndexes` (own index first, then earlier generations descending, rebuilt by the mapper from the full history including dead entries). `BaseStepExecutor.resolveLineageExecution` walks these candidates freshest-first; both the record pool (`getAvailableRecordRefs`) and the previous-steps summary use it. Never key lookups on `stepName` — LinkTo loops can legitimately put the same step name on the live path twice. - **Boundary validation** — Types that cross a trust boundary (wire from the orchestrator, or mapper output) live under `src/types/validated/` as zod schemas with TS types inferred via `z.infer<>`. Strictness depends on origin: schemas the executor **produces** (mapper output) and **frontend** HTTP bodies use `.strict()` (catch our own bugs / input hygiene); the **orchestrator collection schema** instead **strips** unknown keys and requires only structural fields, with step-specific props optional and asserted at use-time by the consuming executor. This keeps the executor resilient to independent orchestrator drift — we fail at step execution, only when a step genuinely lacks what it needs, never in bulk up front for an unrelated add/remove. Validation runs where data enters (`forest-server-workflow-port.getCollectionSchema`, `run-to-available-step-mapper.toAvailableStepExecution`). On parse failure: throw `DomainValidationError` (extends `WorkflowExecutorError`) → bucketized as malformed (dispatch) or surfaced as a step error (execution). Types outside `validated/` are internal runtime state and not zod-validated. Note: `StepOutcome` is validated when it arrives as input via `previousSteps`; executor outputs are trusted by construction. ## Commands diff --git a/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts b/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts index 02c2d51f4d..20824424ec 100644 --- a/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts +++ b/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts @@ -76,11 +76,12 @@ function toStepOutcome(s: ServerStepHistory): StepOutcome { return { type: 'record', ...baseFromCtx, status } satisfies RecordStepOutcome; } -function tryMapStep(s: ServerStepHistory): Step | null { +function tryMapStep(s: ServerStepHistory, lineageStepIndexes: [number, ...number[]]): Step | null { try { return { stepDefinition: toStepDefinition(s.stepDefinition), stepOutcome: toStepOutcome(s), + lineageStepIndexes, }; } catch (err) { // Sub-workflow navigation steps (start-sub-workflow, close-sub-workflow) are not @@ -90,13 +91,31 @@ function tryMapStep(s: ServerStepHistory): Step | null { } } +// All generations of the same logical step share the same root (originalStepIndex chains to +// the FIRST original), so the lineage is rebuilt by grouping the FULL history — dead entries +// included, since they carry the intermediate generations. Own index first, then older ones. +function toLineageStepIndexes( + history: ServerStepHistory[], + step: ServerStepHistory, +): [number, ...number[]] { + const root = step.originalStepIndex ?? step.stepIndex; + const generations = history + .filter(s => (s.originalStepIndex ?? s.stepIndex) === root && s.stepIndex <= step.stepIndex) + .map(s => s.stepIndex) + .sort((a, b) => b - a); + + return generations as [number, ...number[]]; +} + +// Mirrors the orchestrator's own read filter: revised (pivot anchor) and cancelled (dead +// branch) entries are not on the live path and must not reach the AI context. function toPreviousSteps( history: ServerStepHistory[], pendingStepIndex: number, ): ReadonlyArray { return history - .filter(s => s.done && s.stepIndex < pendingStepIndex) - .map(s => tryMapStep(s)) + .filter(s => s.done && !s.revised && !s.cancelled && s.stepIndex < pendingStepIndex) + .map(s => tryMapStep(s, toLineageStepIndexes(history, s))) .filter((s): s is Step => s !== null); } diff --git a/packages/workflow-executor/src/adapters/server-types.ts b/packages/workflow-executor/src/adapters/server-types.ts index de4960b388..a334fa883a 100644 --- a/packages/workflow-executor/src/adapters/server-types.ts +++ b/packages/workflow-executor/src/adapters/server-types.ts @@ -169,6 +169,8 @@ export interface ServerStepHistory { done: boolean; revised?: boolean; cancelled?: boolean; + // Set on revision clones and re-executions; chains to the FIRST original across revisions. + originalStepIndex?: number; context?: Record; childrenWorkflowId?: string; stepDefinition: ServerWorkflowStep; diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index fb91e1549f..c76b9bfb8f 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -6,6 +6,7 @@ import type { StepExecutionResult, } from '../types/execution-context'; import type { ConfirmableStepExecutionData, StepExecutionData } from '../types/step-execution-data'; +import type { Step } from '../types/validated/execution'; import type { StepDefinition } from '../types/validated/step-definition'; import type { StepStatus } from '../types/validated/step-outcome'; import type { @@ -283,16 +284,33 @@ export default abstract class BaseStepExecutor { - const execution = allStepExecutions.find(e => e.stepIndex === stepOutcome.stepIndex); + .map(step => { + const execution = BaseStepExecutor.resolveLineageExecution(step, allStepExecutions); - return StepSummaryBuilder.build(stepDefinition, stepOutcome, execution); + return StepSummaryBuilder.build(step.stepDefinition, step.stepOutcome, execution); }) .join('\n\n'); return [new SystemMessage(summary)]; } + // Revision clones run under a new stepIndex while their execution data stays keyed under the + // original one. Lineage candidates are ordered freshest-first, so the first hit is the most + // recent execution of that logical step. + protected static resolveLineageExecution( + step: Step, + executions: StepExecutionData[], + ): StepExecutionData | undefined { + const candidates = step.lineageStepIndexes ?? [step.stepOutcome.stepIndex]; + + for (const stepIndex of candidates) { + const execution = executions.find(e => e.stepIndex === stepIndex); + if (execution) return execution; + } + + return undefined; + } + private static mergeLeadingSystemMessages(messages: BaseMessage[]): BaseMessage[] { let i = 0; while (i < messages.length && messages[i] instanceof SystemMessage) i += 1; diff --git a/packages/workflow-executor/src/executors/record-step-executor.ts b/packages/workflow-executor/src/executors/record-step-executor.ts index 4430dfacc4..52d18ead78 100644 --- a/packages/workflow-executor/src/executors/record-step-executor.ts +++ b/packages/workflow-executor/src/executors/record-step-executor.ts @@ -8,6 +8,7 @@ import { z } from 'zod'; import { InvalidAIResponseError, InvalidPreRecordedArgsError, NoRecordsError } from '../errors'; import BaseStepExecutor from './base-step-executor'; +import { StepType } from '../types/validated/step-definition'; export default abstract class RecordStepExecutor< TStep extends StepDefinition = StepDefinition, @@ -46,15 +47,22 @@ export default abstract class RecordStepExecutor< return this.selectRecordRef(records, prompt); } + // The pool is scoped to the live path: records are derived from previousSteps (already + // cleaned of revised/cancelled entries), not from the raw RunStore — executions persisted + // by dead-branch steps must not offer their records to re-executed steps. protected async getAvailableRecordRefs(): Promise { const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); - const relatedRecords = stepExecutions.flatMap(e => { + const relatedRecords = this.context.previousSteps.flatMap(step => { + if (step.stepDefinition.type !== StepType.LoadRelatedRecord) return []; + + const execution = BaseStepExecutor.resolveLineageExecution(step, stepExecutions); + if ( - e.type === 'load-related-record' && - e.executionResult !== undefined && - 'record' in e.executionResult + execution?.type === 'load-related-record' && + execution.executionResult !== undefined && + 'record' in execution.executionResult ) { - return [e.executionResult.record]; + return [execution.executionResult.record]; } return []; diff --git a/packages/workflow-executor/src/types/validated/execution.ts b/packages/workflow-executor/src/types/validated/execution.ts index 43b5c0c06a..a3fd6ec123 100644 --- a/packages/workflow-executor/src/types/validated/execution.ts +++ b/packages/workflow-executor/src/types/validated/execution.ts @@ -25,6 +25,11 @@ export const StepSchema = z .object({ stepDefinition: StepDefinitionSchema, stepOutcome: StepOutcomeSchema, + // RunStore execution lookup candidates, own stepIndex first then earlier same-step + // generations descending. Revision clones run under a new stepIndex while their execution + // data stays keyed under the original one — consumers walk these until an execution is + // found. Absent (legacy producers) means "own index only". + lineageStepIndexes: z.array(z.number().int().nonnegative()).nonempty().optional(), }) .strict(); export type Step = z.infer; diff --git a/packages/workflow-executor/test/adapters/run-to-available-step-mapper.test.ts b/packages/workflow-executor/test/adapters/run-to-available-step-mapper.test.ts index 8919712a43..51e59a5670 100644 --- a/packages/workflow-executor/test/adapters/run-to-available-step-mapper.test.ts +++ b/packages/workflow-executor/test/adapters/run-to-available-step-mapper.test.ts @@ -501,6 +501,189 @@ describe('toAvailableStepExecution', () => { }); }); + describe('revision handling', () => { + // Revision model (orchestrator): the pivot card is stamped revised:true, every step after + // it is stamped cancelled:true, and clones of the still-valid card sub-steps are appended + // at the tail with originalStepIndex chaining to the FIRST original. The live path is + // filter(!revised && !cancelled). + + function makeClonedStepHistory( + overrides: Partial, + originalStepIndex: number, + ): ServerStepHistory { + return { ...makeStepHistory(overrides), originalStepIndex }; + } + + /** + * Canonical single-revision scenario (sub B1 of card B revised): + * idx 0 trunk task done ← live + * idx 1 card B done, revised ← pivot anchor (dead) + * idx 2 sub B1 done, cancelled ← dead branch + * idx 3 card C done, cancelled ← dead branch + * idx 4 card B clone done, originalStepIndex 1 ← live + * idx 5 sub B1 re-exec pending ← current + */ + function makeRevisedRun(): ServerHydratedWorkflowRun { + return makeRun({ + workflowHistory: [ + makeStepHistory({ stepName: 'trunk', stepIndex: 0, done: true }), + makeStepHistory({ stepName: 'card-b', stepIndex: 1, done: true, revised: true }), + makeStepHistory({ stepName: 'sub-b1', stepIndex: 2, done: true, cancelled: true }), + makeStepHistory({ stepName: 'card-c', stepIndex: 3, done: true, cancelled: true }), + makeClonedStepHistory({ stepName: 'card-b', stepIndex: 4, done: true }, 1), + makeClonedStepHistory({ stepName: 'sub-b1', stepIndex: 5, done: false }, 2), + ], + }); + } + + it('excludes cancelled steps from previousSteps', () => { + const run = makeRun({ + workflowHistory: [ + makeStepHistory({ stepName: 's0', stepIndex: 0, done: true }), + makeStepHistory({ stepName: 's1', stepIndex: 1, done: true, cancelled: true }), + makeStepHistory({ stepName: 's2', stepIndex: 2, done: false }), + ], + }); + + const result = toAvailableStepExecution(run); + + expect(result?.previousSteps).toHaveLength(1); + expect(result?.previousSteps[0].stepOutcome.stepId).toBe('s0'); + }); + + it('excludes the revised anchor while keeping its live clone', () => { + const result = toAvailableStepExecution(makeRevisedRun()); + + const indexes = result?.previousSteps.map(s => s.stepOutcome.stepIndex); + expect(indexes).toEqual([0, 4]); + }); + + it('returns empty previousSteps when the entry point is revised', () => { + // Entry-point revision: the pivot IS the first step, so nothing valid precedes the + // re-execution — context must collapse to a clean slate. + const run = makeRun({ + workflowHistory: [ + makeStepHistory({ stepName: 'entry', stepIndex: 0, done: true, revised: true }), + makeStepHistory({ stepName: 's1', stepIndex: 1, done: true, cancelled: true }), + makeClonedStepHistory({ stepName: 'entry', stepIndex: 2, done: false }, 0), + ], + }); + + const result = toAvailableStepExecution(run); + + expect(result?.previousSteps).toEqual([]); + }); + + it('does not attempt to map dead-branch steps (filtering precedes mapping)', () => { + // A cancelled step with an unmappable definition must not fail the run — it is dead. + const run = makeRun({ + workflowHistory: [ + makeStepHistory({ + stepName: 's0', + stepIndex: 0, + done: true, + cancelled: true, + stepDefinition: makeTaskStepDef({ + taskType: 'unknown-future-type' as ServerTaskTypeEnum, + title: 't', + }), + }), + makeStepHistory({ stepName: 's1', stepIndex: 1, done: false }), + ], + }); + + const result = toAvailableStepExecution(run); + + expect(result?.previousSteps).toEqual([]); + }); + + describe('lineageStepIndexes', () => { + it('is the own index alone for steps that were never revised', () => { + const run = makeRun({ + workflowHistory: [ + makeStepHistory({ stepName: 's0', stepIndex: 0, done: true }), + makeStepHistory({ stepName: 's1', stepIndex: 1, done: false }), + ], + }); + + const result = toAvailableStepExecution(run); + + expect(result?.previousSteps).toEqual([ + expect.objectContaining({ lineageStepIndexes: [0] }), + ]); + }); + + it('walks a clone back to its original through originalStepIndex', () => { + const result = toAvailableStepExecution(makeRevisedRun()); + + expect(result?.previousSteps).toEqual([ + expect.objectContaining({ + stepOutcome: expect.objectContaining({ stepIndex: 0 }), + lineageStepIndexes: [0], + }), + expect.objectContaining({ + stepOutcome: expect.objectContaining({ stepIndex: 4 }), + lineageStepIndexes: [4, 1], + }), + ]); + }); + + it('includes every generation latest-first after a double revision', () => { + // idx 0: original (dead), idx 1: re-execution (dead, root 0), idx 2: clone of the + // re-execution (live, root 0 — originalStepIndex chains to the FIRST original). + // The intermediate generation at idx 1 is only discoverable via the dead entry. + const run = makeRun({ + workflowHistory: [ + makeStepHistory({ stepName: 'b1', stepIndex: 0, done: true, cancelled: true }), + makeClonedStepHistory({ stepName: 'b1', stepIndex: 1, done: true, cancelled: true }, 0), + makeClonedStepHistory({ stepName: 'b1', stepIndex: 2, done: true }, 0), + makeStepHistory({ stepName: 'next', stepIndex: 3, done: false }), + ], + }); + + const result = toAvailableStepExecution(run); + + expect(result?.previousSteps).toEqual([ + expect.objectContaining({ + stepOutcome: expect.objectContaining({ stepIndex: 2 }), + lineageStepIndexes: [2, 1, 0], + }), + ]); + }); + + it('keeps separate lineages for two live instances of the same stepName (LinkTo loop)', () => { + // Cycles are a product feature (designer LinkTo tool): the same definition node can + // legitimately execute twice on the live path. Lineage is instance identity, not + // definition identity — the two iterations must not share candidates. + const run = makeRun({ + workflowHistory: [ + makeStepHistory({ stepName: 'load-x', stepIndex: 0, done: true }), + makeStepHistory({ stepName: 'cond', stepIndex: 1, done: true }), + makeStepHistory({ stepName: 'load-x', stepIndex: 2, done: true }), + makeStepHistory({ stepName: 'next', stepIndex: 3, done: false }), + ], + }); + + const result = toAvailableStepExecution(run); + + expect(result?.previousSteps).toEqual([ + expect.objectContaining({ + stepOutcome: expect.objectContaining({ stepId: 'load-x', stepIndex: 0 }), + lineageStepIndexes: [0], + }), + expect.objectContaining({ + stepOutcome: expect.objectContaining({ stepId: 'cond', stepIndex: 1 }), + lineageStepIndexes: [1], + }), + expect.objectContaining({ + stepOutcome: expect.objectContaining({ stepId: 'load-x', stepIndex: 2 }), + lineageStepIndexes: [2], + }), + ]); + }); + }); + }); + describe('user mapping', () => { it('should map server userProfile to StepUser with null → empty string', () => { const profile: ServerUserProfile = { diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index bef89c9760..17d1b14851 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -4,6 +4,7 @@ import type { RunStore } from '../../src/ports/run-store'; import type { ExecutionContext, StepExecutionResult } from '../../src/types/execution-context'; import type { StepExecutionData } from '../../src/types/step-execution-data'; import type { RecordRef } from '../../src/types/validated/collection'; +import type { Step } from '../../src/types/validated/execution'; import type { StepDefinition } from '../../src/types/validated/step-definition'; import type { BaseStepStatus, StepOutcome } from '../../src/types/validated/step-outcome'; import type { BaseMessage, DynamicStructuredTool } from '@forestadmin/ai-proxy'; @@ -211,6 +212,125 @@ describe('BaseStepExecutor', () => { }); }); + describe('previous-steps summary after revision', () => { + // Revision clones keep their stepName but get a NEW stepIndex, while the RunStore keys + // executions by the ORIGINAL index. The summary must resolve a step's execution through + // its lineageStepIndexes (own index first, then earlier generations) instead of an exact + // stepIndex match — otherwise clone summaries lose all execution detail. + + function makeLineageEntry( + overrides: { stepId?: string; stepIndex?: number; prompt?: string }, + lineageStepIndexes: number[], + ): Step { + return { ...makeHistoryEntry(overrides), lineageStepIndexes }; + } + + it('resolves a clone summary through its lineage to the original execution', async () => { + const runStore = makeMockRunStore([ + { + type: 'condition', + stepIndex: 3, + executionParams: { answer: 'Yes', reasoning: 'REASONING-AT-IDX-3' }, + executionResult: { answer: 'Yes' }, + }, + ]); + const executor = new TestableExecutor( + makeContext({ + previousSteps: [ + makeLineageEntry({ stepId: 'cond-1', stepIndex: 7, prompt: 'Approve?' }, [7, 3]), + ], + runStore, + }), + ); + + const messages = await executor.buildPreviousStepsMessages(); + const content = messages[0].content as string; + + expect(content).toContain('Step "cond-1"'); + expect(content).toContain('REASONING-AT-IDX-3'); + }); + + it('prefers the freshest lineage generation when several executions exist', async () => { + const runStore = makeMockRunStore([ + { + type: 'condition', + stepIndex: 3, + executionParams: { answer: 'No', reasoning: 'STALE-GENERATION' }, + executionResult: { answer: 'No' }, + }, + { + type: 'condition', + stepIndex: 7, + executionParams: { answer: 'Yes', reasoning: 'FRESH-GENERATION' }, + executionResult: { answer: 'Yes' }, + }, + ]); + const executor = new TestableExecutor( + makeContext({ + previousSteps: [ + makeLineageEntry({ stepId: 'cond-1', stepIndex: 10, prompt: 'Approve?' }, [10, 7, 3]), + ], + runStore, + }), + ); + + const messages = await executor.buildPreviousStepsMessages(); + const content = messages[0].content as string; + + expect(content).toContain('FRESH-GENERATION'); + expect(content).not.toContain('STALE-GENERATION'); + }); + + it('falls back to outcome history details when no lineage generation has an execution', async () => { + const runStore = makeMockRunStore([]); + const executor = new TestableExecutor( + makeContext({ + previousSteps: [ + makeLineageEntry({ stepId: 'cond-1', stepIndex: 7, prompt: 'Approve?' }, [7, 3]), + ], + runStore, + }), + ); + + const messages = await executor.buildPreviousStepsMessages(); + const content = messages[0].content as string; + + expect(content).toContain('Step "cond-1"'); + expect(content).toContain('History:'); + }); + + it('does not surface executions of steps absent from previousSteps (dead branch)', async () => { + const runStore = makeMockRunStore([ + { + type: 'condition', + stepIndex: 0, + executionParams: { answer: 'Yes', reasoning: 'LIVE-INPUT' }, + executionResult: { answer: 'Yes' }, + }, + { + type: 'condition', + stepIndex: 5, + executionParams: { answer: 'No', reasoning: 'DEAD-INPUT' }, + executionResult: { answer: 'No' }, + }, + ]); + const executor = new TestableExecutor( + makeContext({ + previousSteps: [ + makeLineageEntry({ stepId: 'cond-1', stepIndex: 0, prompt: 'Approve?' }, [0]), + ], + runStore, + }), + ); + + const messages = await executor.buildPreviousStepsMessages(); + const content = messages[0].content as string; + + expect(content).toContain('LIVE-INPUT'); + expect(content).not.toContain('DEAD-INPUT'); + }); + }); + describe('execute error handling', () => { it('converts NoRecordsError to error outcome', async () => { const executor = new TestableExecutor(makeContext(), new NoRecordsError()); diff --git a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts index 5ffe45226c..5c885c6b91 100644 --- a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts @@ -4,6 +4,7 @@ import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext } from '../../src/types/execution-context'; import type { LoadRelatedRecordStepExecutionData } from '../../src/types/step-execution-data'; import type { CollectionSchema, RecordData, RecordRef } from '../../src/types/validated/collection'; +import type { Step } from '../../src/types/validated/execution'; import type { LoadRelatedRecordStepDefinition } from '../../src/types/validated/step-definition'; import { AgentPortError, RunStorePortError } from '../../src/errors'; @@ -196,6 +197,23 @@ function makePendingExecution( }; } +function makeLoadRelatedPreviousStep(stepIndex: number, lineageStepIndexes: number[]): Step { + return { + stepDefinition: { + type: StepType.LoadRelatedRecord, + executionType: StepExecutionMode.FullyAutomated, + prompt: 'Load the order', + }, + stepOutcome: { + type: 'record', + stepId: `load-${stepIndex}`, + stepIndex, + status: 'success', + }, + lineageStepIndexes, + }; +} + describe('LoadRelatedRecordStepExecutor', () => { describe('executionType=FullyAutomated: BelongsTo — load direct (Branch B)', () => { it('fetches 1 related record and returns success', async () => { @@ -2398,7 +2416,14 @@ describe('LoadRelatedRecordStepExecutor', () => { customers: makeCollectionSchema(), orders: ordersSchema, }); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort, agentPort }); + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + agentPort, + previousSteps: [makeLoadRelatedPreviousStep(2, [2])], + }); const executor = new LoadRelatedRecordStepExecutor(context); const result = await executor.execute(); @@ -2709,7 +2734,13 @@ describe('LoadRelatedRecordStepExecutor', () => { customers: makeCollectionSchema(), orders: ordersSchema, }); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort }); + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + previousSteps: [makeLoadRelatedPreviousStep(2, [2]), makeLoadRelatedPreviousStep(3, [3])], + }); const executor = new LoadRelatedRecordStepExecutor(context); await executor.execute(); @@ -2829,4 +2860,51 @@ describe('LoadRelatedRecordStepExecutor', () => { expect(bindTools).toHaveBeenCalled(); }); }); + + describe('record pool after revision', () => { + it('re-executes a revised load step from the base record, not from the dead branch record', async () => { + // Given: the run loaded an owner before the user revised the "Load store" step. The + // owner's execution survives in the RunStore (dead branch), but the cleaned + // previousSteps no longer claims it. + const mockModel = makeMockModel({ relationName: 'Order', reasoning: 'reload' }); + const agentPort = makeMockAgentPort(); + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([ + { + type: 'load-related-record', + stepIndex: 2, + executionResult: { + relation: { name: 'owner', displayName: 'Owner' }, + record: makeRecordRef({ collectionName: 'owners', recordId: [7], stepIndex: 2 }), + }, + selectedRecordRef: makeRecordRef(), + }, + ]), + }); + const context = makeContext({ + model: mockModel.model, + agentPort, + runStore, + previousSteps: [], + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + const executor = new LoadRelatedRecordStepExecutor(context); + + // When + const result = await executor.execute(); + + // Then: the pool collapsed to the base record — no select-record round; the relation + // was selected and loaded directly from the base record (the ticket's bug offered the + // dead branch's owner as a source instead). + expect(result.stepOutcome.status).toBe('success'); + expect(mockModel.bindTools).toHaveBeenCalledTimes(1); + expect((mockModel.bindTools.mock.calls[0][0][0] as { name: string }).name).toBe( + 'select-relation', + ); + expect(agentPort.getRelatedData).toHaveBeenCalledWith( + { collection: 'customers', id: [42], relation: 'order', limit: 1 }, + expect.objectContaining({ id: 1 }), + ); + }); + }); }); diff --git a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts index d483ce2d37..40c9a5fd4a 100644 --- a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts @@ -3,6 +3,7 @@ import type { RunStore } from '../../src/ports/run-store'; import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext } from '../../src/types/execution-context'; import type { CollectionSchema, RecordRef } from '../../src/types/validated/collection'; +import type { Step } from '../../src/types/validated/execution'; import type { ReadRecordStepDefinition } from '../../src/types/validated/step-definition'; import { AgentPortError, NoRecordsError, RecordNotFoundError } from '../../src/errors'; @@ -142,6 +143,23 @@ function makeContext( }; } +function makeLoadRelatedPreviousStep(stepIndex: number, lineageStepIndexes: number[]): Step { + return { + stepDefinition: { + type: StepType.LoadRelatedRecord, + executionType: StepExecutionMode.FullyAutomated, + prompt: 'Load the order', + }, + stepOutcome: { + type: 'record', + stepId: `load-${stepIndex}`, + stepIndex, + status: 'success', + }, + lineageStepIndexes, + }; +} + describe('ReadRecordStepExecutor', () => { describe('single record, single field', () => { it('reads a single field and returns success', async () => { @@ -408,7 +426,13 @@ describe('ReadRecordStepExecutor', () => { customers: makeCollectionSchema(), orders: ordersSchema, }); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort }); + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + previousSteps: [makeLoadRelatedPreviousStep(2, [2])], + }); const executor = new ReadRecordStepExecutor(context); const result = await executor.execute(); @@ -501,7 +525,14 @@ describe('ReadRecordStepExecutor', () => { const agentPort = makeMockAgentPort({ orders: { values: { total: 150 } }, }); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort, agentPort }); + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + agentPort, + previousSteps: [makeLoadRelatedPreviousStep(2, [2])], + }); const executor = new ReadRecordStepExecutor(context); const result = await executor.execute(); @@ -576,7 +607,13 @@ describe('ReadRecordStepExecutor', () => { orders: ordersSchema, }); const executor = new ReadRecordStepExecutor( - makeContext({ baseRecordRef, model, runStore, workflowPort }), + makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + previousSteps: [makeLoadRelatedPreviousStep(5, [5])], + }), ); await executor.execute(); @@ -631,7 +668,13 @@ describe('ReadRecordStepExecutor', () => { customers: makeCollectionSchema(), orders: ordersSchema, }); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort }); + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + previousSteps: [makeLoadRelatedPreviousStep(1, [1])], + }); const executor = new ReadRecordStepExecutor(context); const result = await executor.execute(); @@ -922,13 +965,18 @@ describe('ReadRecordStepExecutor', () => { { type: 'load-related-record', stepIndex: 1, - executionResult: { record: relatedRef }, + executionResult: { + relation: { name: 'order', displayName: 'Order' }, + record: relatedRef, + }, + selectedRecordRef: makeRecordRef(), }, ]), }); const context = makeContext({ model: mockModel.model, runStore, + previousSteps: [makeLoadRelatedPreviousStep(1, [1])], stepDefinition: makeStep({ preRecordedArgs: { selectedRecordStepIndex: 1 }, }), @@ -1005,4 +1053,217 @@ describe('ReadRecordStepExecutor', () => { expect(mockModel.bindTools).toHaveBeenCalledTimes(1); }); }); + + describe('record pool after revision', () => { + // After a revision, records persisted in the RunStore by now-dead steps must leave the + // pool, while records produced by steps whose live representative is a CLONE (new + // stepIndex) must be resolved through the clone's lineageStepIndexes — the RunStore keys + // executions by the ORIGINAL index. + function makeLoadRelatedExecution(stepIndex: number, recordId: number) { + return { + type: 'load-related-record', + stepIndex, + executionResult: { + relation: { name: 'order', displayName: 'Order' }, + record: makeRecordRef({ collectionName: 'orders', recordId: [recordId], stepIndex }), + }, + selectedRecordRef: makeRecordRef(), + }; + } + + const ordersSchema = () => + makeCollectionSchema({ + collectionName: 'orders', + collectionDisplayName: 'Orders', + fields: [{ fieldName: 'total', displayName: 'Total', isRelationship: false }], + }); + + it('ignores records persisted by dead-branch steps when no live step claims them', async () => { + // Given: the RunStore still holds a record loaded before the revision, but the cleaned + // previousSteps no longer contains any load-related step. + const mockModel = makeMockModel({ fieldNames: ['email'] }); + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([makeLoadRelatedExecution(2, 99)]), + }); + const agentPort = makeMockAgentPort(); + const context = makeContext({ + model: mockModel.model, + runStore, + agentPort, + previousSteps: [], + }); + const executor = new ReadRecordStepExecutor(context); + + // When + const result = await executor.execute(); + + // Then: the pool collapsed to the base record — no select-record AI round happened, + // and the read targeted the base record, not the stale order. + expect(result.stepOutcome.status).toBe('success'); + expect(mockModel.bindTools).toHaveBeenCalledTimes(1); + expect((mockModel.bindTools.mock.calls[0][0][0] as { name: string }).name).toBe( + 'read-selected-record-fields', + ); + expect(agentPort.getRecord).toHaveBeenCalledTimes(1); + expect(agentPort.getRecord).toHaveBeenCalledWith( + expect.objectContaining({ collection: 'customers', id: [42] }), + expect.objectContaining({ id: 1 }), + ); + }); + + it('resolves a live clone record through its lineage to the original RunStore entry', async () => { + // Given: the live previous step is a clone at idx 7 whose execution (and record) was + // stored under the original idx 3. + const invoke = jest + .fn() + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'select-record', args: { recordIdentifier: 'Step 3 - Orders #99' }, id: 'c1' }, + ], + }) + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'read-selected-record-fields', args: { fieldNames: ['total'] }, id: 'c2' }, + ], + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const model = { bindTools } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([makeLoadRelatedExecution(3, 99)]), + }); + const agentPort = makeMockAgentPort({ orders: { values: { total: 150 } } }); + const context = makeContext({ + model, + runStore, + agentPort, + workflowPort: makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema(), + }), + previousSteps: [makeLoadRelatedPreviousStep(7, [7, 3])], + }); + const executor = new ReadRecordStepExecutor(context); + + // When + const result = await executor.execute(); + + // Then: the clone's record was offered and readable. + expect(result.stepOutcome.status).toBe('success'); + expect(agentPort.getRecord).toHaveBeenCalledWith( + expect.objectContaining({ collection: 'orders', id: [99] }), + expect.objectContaining({ id: 1 }), + ); + }); + + it('offers only the freshest lineage generation after a double revision', async () => { + // Given: the same logical step executed twice (idx 3 then idx 7, stale → fresh); its + // live representative is a clone at idx 10 with lineage [10, 7, 3]. Only the freshest + // execution's record may enter the pool. + const invoke = jest + .fn() + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'select-record', args: { recordIdentifier: 'Step 7 - Orders #77' }, id: 'c1' }, + ], + }) + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'read-selected-record-fields', args: { fieldNames: ['total'] }, id: 'c2' }, + ], + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const model = { bindTools } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest + .fn() + .mockResolvedValue([makeLoadRelatedExecution(3, 99), makeLoadRelatedExecution(7, 77)]), + }); + const agentPort = makeMockAgentPort({ orders: { values: { total: 150 } } }); + const context = makeContext({ + model, + runStore, + agentPort, + workflowPort: makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema(), + }), + previousSteps: [makeLoadRelatedPreviousStep(10, [10, 7, 3])], + }); + const executor = new ReadRecordStepExecutor(context); + + // When + const result = await executor.execute(); + + // Then: the select-record tool offered exactly base + freshest — the stale idx-3 record + // is absent from the enum. + const selectTool = bindTools.mock.calls[0][0][0] as { + name: string; + schema: { shape: { recordIdentifier: { options: string[] } } }; + }; + expect(selectTool.name).toBe('select-record'); + expect(selectTool.schema.shape.recordIdentifier.options).toEqual([ + 'Step 0 - Customers #42', + 'Step 7 - Orders #77', + ]); + expect(result.stepOutcome.status).toBe('success'); + expect(agentPort.getRecord).toHaveBeenCalledWith( + expect.objectContaining({ collection: 'orders', id: [77] }), + expect.objectContaining({ id: 1 }), + ); + }); + + it('offers both records when the same step ran twice on the live path (LinkTo loop)', async () => { + // Given: a loop executed the same load step at idx 0 and idx 2 — two distinct live + // instances, two distinct records. Lineage is per instance; neither shadows the other. + const invoke = jest + .fn() + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'select-record', args: { recordIdentifier: 'Step 2 - Orders #77' }, id: 'c1' }, + ], + }) + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'read-selected-record-fields', args: { fieldNames: ['total'] }, id: 'c2' }, + ], + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const model = { bindTools } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest + .fn() + .mockResolvedValue([makeLoadRelatedExecution(0, 99), makeLoadRelatedExecution(2, 77)]), + }); + const agentPort = makeMockAgentPort({ orders: { values: { total: 150 } } }); + const context = makeContext({ + model, + runStore, + agentPort, + workflowPort: makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema(), + }), + previousSteps: [makeLoadRelatedPreviousStep(0, [0]), makeLoadRelatedPreviousStep(2, [2])], + baseRecordRef: makeRecordRef({ stepIndex: 4 }), + }); + const executor = new ReadRecordStepExecutor(context); + + // When + const result = await executor.execute(); + + // Then + const selectTool = bindTools.mock.calls[0][0][0] as { + schema: { shape: { recordIdentifier: { options: string[] } } }; + }; + expect(selectTool.schema.shape.recordIdentifier.options).toEqual([ + 'Step 4 - Customers #42', + 'Step 0 - Orders #99', + 'Step 2 - Orders #77', + ]); + expect(result.stepOutcome.status).toBe('success'); + }); + }); }); diff --git a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts index 59648d2751..6cf76ab478 100644 --- a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts @@ -4,6 +4,7 @@ import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext } from '../../src/types/execution-context'; import type { TriggerRecordActionStepExecutionData } from '../../src/types/step-execution-data'; import type { CollectionSchema, RecordRef } from '../../src/types/validated/collection'; +import type { Step } from '../../src/types/validated/execution'; import type { TriggerActionStepDefinition } from '../../src/types/validated/step-definition'; import { AgentPortError, RunStorePortError, StepStateError } from '../../src/errors'; @@ -144,6 +145,23 @@ function makeContext( }; } +function makeLoadRelatedPreviousStep(stepIndex: number, lineageStepIndexes: number[]): Step { + return { + stepDefinition: { + type: StepType.LoadRelatedRecord, + executionType: StepExecutionMode.FullyAutomated, + prompt: 'Load the order', + }, + stepOutcome: { + type: 'record', + stepId: `load-${stepIndex}`, + stepIndex, + status: 'success', + }, + lineageStepIndexes, + }; +} + describe('TriggerRecordActionStepExecutor', () => { describe('executionType=FullyAutomated: trigger direct (Branch B)', () => { it('triggers the action and returns success', async () => { @@ -770,7 +788,14 @@ describe('TriggerRecordActionStepExecutor', () => { orders: ordersSchema, }); const agentPort = makeMockAgentPort(); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort, agentPort }); + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + agentPort, + previousSteps: [makeLoadRelatedPreviousStep(2, [2])], + }); const executor = new TriggerRecordActionStepExecutor(context); const result = await executor.execute(); diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index 48becbae9f..3e7ee83048 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -4,6 +4,7 @@ import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext } from '../../src/types/execution-context'; import type { UpdateRecordStepExecutionData } from '../../src/types/step-execution-data'; import type { CollectionSchema, RecordRef } from '../../src/types/validated/collection'; +import type { Step } from '../../src/types/validated/execution'; import type { UpdateRecordStepDefinition } from '../../src/types/validated/step-definition'; import { AgentPortError, RunStorePortError, StepStateError } from '../../src/errors'; @@ -141,6 +142,23 @@ function makeContext( }; } +function makeLoadRelatedPreviousStep(stepIndex: number, lineageStepIndexes: number[]): Step { + return { + stepDefinition: { + type: StepType.LoadRelatedRecord, + executionType: StepExecutionMode.FullyAutomated, + prompt: 'Load the order', + }, + stepOutcome: { + type: 'record', + stepId: `load-${stepIndex}`, + stepIndex, + status: 'success', + }, + lineageStepIndexes, + }; +} + describe('UpdateRecordStepExecutor', () => { describe('executionType=FullyAutomated: update direct (Branch B)', () => { it('updates the record and returns success', async () => { @@ -546,7 +564,13 @@ describe('UpdateRecordStepExecutor', () => { customers: makeCollectionSchema(), orders: ordersSchema, }); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort }); + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + previousSteps: [makeLoadRelatedPreviousStep(2, [2])], + }); const executor = new UpdateRecordStepExecutor(context); const result = await executor.execute(); From c2a44b0410829fa310ee031beb72d6af7a8faf95 Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Wed, 3 Jun 2026 15:36:18 +0200 Subject: [PATCH 2/7] test(workflow-executor): adapt revision repro to xToOne load flow The base branch's load-related rework routes BelongsTo loads through getSingleRelatedData instead of getRelatedData; the revision repro test asserted the old port call. Also rewords a test comment that referenced the ticket (review feedback). Co-Authored-By: Claude Opus 4.8 --- .../load-related-record-step-executor.test.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts index 5c885c6b91..bf7e7fc94e 100644 --- a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts @@ -2894,15 +2894,19 @@ describe('LoadRelatedRecordStepExecutor', () => { const result = await executor.execute(); // Then: the pool collapsed to the base record — no select-record round; the relation - // was selected and loaded directly from the base record (the ticket's bug offered the - // dead branch's owner as a source instead). + // was selected and loaded directly from the base record (previously the dead branch's + // owner was offered as a source instead). expect(result.stepOutcome.status).toBe('success'); expect(mockModel.bindTools).toHaveBeenCalledTimes(1); expect((mockModel.bindTools.mock.calls[0][0][0] as { name: string }).name).toBe( 'select-relation', ); - expect(agentPort.getRelatedData).toHaveBeenCalledWith( - { collection: 'customers', id: [42], relation: 'order', limit: 1 }, + expect(agentPort.getSingleRelatedData).toHaveBeenCalledWith( + expect.objectContaining({ + collection: 'customers', + id: [42], + relation: 'order', + }), expect.objectContaining({ id: 1 }), ); }); From 584f656b278537bbb2c4301b9fa6cd8d46e37606 Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Wed, 3 Jun 2026 17:32:25 +0200 Subject: [PATCH 3/7] docs(workflow-executor): trim comments restating code or duplicated rationale Co-Authored-By: Claude Opus 4.8 --- packages/workflow-executor/CLAUDE.md | 2 +- .../src/adapters/run-to-available-step-mapper.ts | 6 +++--- .../workflow-executor/src/executors/base-step-executor.ts | 5 ++--- .../workflow-executor/src/executors/record-step-executor.ts | 6 +++--- packages/workflow-executor/src/types/validated/execution.ts | 3 +-- 5 files changed, 10 insertions(+), 12 deletions(-) diff --git a/packages/workflow-executor/CLAUDE.md b/packages/workflow-executor/CLAUDE.md index 11ae0bc427..8d3fe66f90 100644 --- a/packages/workflow-executor/CLAUDE.md +++ b/packages/workflow-executor/CLAUDE.md @@ -94,7 +94,7 @@ src/ - **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded values (display names) directly instead of invoking the AI. Each record step type has its own typed `preRecordedArgs` shape. Validation happens via schema resolution — invalid display names throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI. - **Graceful shutdown** — `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class. - **Structured log context** — `BaseStepExecutor.execute()` stamps every log line with a shared `logCtx` (`runId`, `stepId`, `stepIndex`, `stepType`). Executors with type-specific identifiers add them via the `getExtraLogContext()` hook (default `{}`), keeping the base class free of step-specific knowledge — e.g. `McpStepExecutor` returns `{ mcpServerId, mcpServerName }` so MCP step logs unambiguously identify the targeted server (`mcpServerId` is canonical; `mcpServerName` is the human-readable Record key, not guaranteed unique at the DB level). `mcpServerName` is resolved by `RemoteToolFetcher.fetch()` from the scoped config Record key and forwarded to the executor constructor. -- **Revision-aware history reads** — When a user revises a step, the orchestrator stamps the pivot card `revised: true` and every later entry `cancelled: true`, then appends clones of the still-valid card sub-steps (with `originalStepIndex` chaining to the FIRST original) plus the re-executed step. The live path is `filter(!revised && !cancelled)` — `toPreviousSteps` applies it, mirroring the orchestrator's own reads. Because clones run under a new `stepIndex` while their RunStore execution data stays keyed under the original one, each previous step carries `lineageStepIndexes` (own index first, then earlier generations descending, rebuilt by the mapper from the full history including dead entries). `BaseStepExecutor.resolveLineageExecution` walks these candidates freshest-first; both the record pool (`getAvailableRecordRefs`) and the previous-steps summary use it. Never key lookups on `stepName` — LinkTo loops can legitimately put the same step name on the live path twice. +- **Revision-aware history reads** — On revision the orchestrator (server-side) marks the pivot card `revised` and every later entry `cancelled`, then appends clones of the still-valid steps (`originalStepIndex` chains to the first original). Any consumer of `workflowHistory` must restrict to the live path (`!revised && !cancelled`) — skipping this is what leaks a superseded branch's context into a re-run. Resolve a step's RunStore execution through its `lineageStepIndexes`, never by `stepName` — LinkTo loops can legitimately put the same step name on the live path twice. - **Boundary validation** — Types that cross a trust boundary (wire from the orchestrator, or mapper output) live under `src/types/validated/` as zod schemas with TS types inferred via `z.infer<>`. Strictness depends on origin: schemas the executor **produces** (mapper output) and **frontend** HTTP bodies use `.strict()` (catch our own bugs / input hygiene); the **orchestrator collection schema** instead **strips** unknown keys and requires only structural fields, with step-specific props optional and asserted at use-time by the consuming executor. This keeps the executor resilient to independent orchestrator drift — we fail at step execution, only when a step genuinely lacks what it needs, never in bulk up front for an unrelated add/remove. Validation runs where data enters (`forest-server-workflow-port.getCollectionSchema`, `run-to-available-step-mapper.toAvailableStepExecution`). On parse failure: throw `DomainValidationError` (extends `WorkflowExecutorError`) → bucketized as malformed (dispatch) or surfaced as a step error (execution). Types outside `validated/` are internal runtime state and not zod-validated. Note: `StepOutcome` is validated when it arrives as input via `previousSteps`; executor outputs are trusted by construction. ## Commands diff --git a/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts b/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts index 20824424ec..b9c5ace323 100644 --- a/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts +++ b/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts @@ -93,7 +93,7 @@ function tryMapStep(s: ServerStepHistory, lineageStepIndexes: [number, ...number // All generations of the same logical step share the same root (originalStepIndex chains to // the FIRST original), so the lineage is rebuilt by grouping the FULL history — dead entries -// included, since they carry the intermediate generations. Own index first, then older ones. +// included, since they carry the intermediate generations. function toLineageStepIndexes( history: ServerStepHistory[], step: ServerStepHistory, @@ -107,8 +107,8 @@ function toLineageStepIndexes( return generations as [number, ...number[]]; } -// Mirrors the orchestrator's own read filter: revised (pivot anchor) and cancelled (dead -// branch) entries are not on the live path and must not reach the AI context. +// Mirrors the orchestrator's own read filter: revised and cancelled entries are not on the +// live path. function toPreviousSteps( history: ServerStepHistory[], pendingStepIndex: number, diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index c76b9bfb8f..871b40423b 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -294,9 +294,8 @@ export default abstract class BaseStepExecutor { const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); const relatedRecords = this.context.previousSteps.flatMap(step => { diff --git a/packages/workflow-executor/src/types/validated/execution.ts b/packages/workflow-executor/src/types/validated/execution.ts index a3fd6ec123..2494c4d2c7 100644 --- a/packages/workflow-executor/src/types/validated/execution.ts +++ b/packages/workflow-executor/src/types/validated/execution.ts @@ -27,8 +27,7 @@ export const StepSchema = z stepOutcome: StepOutcomeSchema, // RunStore execution lookup candidates, own stepIndex first then earlier same-step // generations descending. Revision clones run under a new stepIndex while their execution - // data stays keyed under the original one — consumers walk these until an execution is - // found. Absent (legacy producers) means "own index only". + // data stays keyed under the original one. Absent (legacy producers) means "own index only". lineageStepIndexes: z.array(z.number().int().nonnegative()).nonempty().optional(), }) .strict(); From 03e2b080145c9c45894ee490ca64b95420e7e63a Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Thu, 4 Jun 2026 17:18:47 +0200 Subject: [PATCH 4/7] fix(workflow-executor): resolve step records own-index-first, mirror front Replaces the lineage-walk with the frontend's carry-forward model: a step's record comes from its own RunStore entry, falling back to originalStepIndex only for a revision clone the executor never ran. Own-index-first stops a re-executed step that produced no record (failed / skipped / handled manually) from resurfacing its superseded original's record. Drops lineageStepIndexes from the Step type and the lineage grouping from the mapper. Co-Authored-By: Claude Opus 4.8 --- packages/workflow-executor/CLAUDE.md | 2 +- .../adapters/run-to-available-step-mapper.ts | 22 +---- .../src/executors/base-step-executor.ts | 18 ++-- .../src/executors/record-step-executor.ts | 2 +- .../src/types/validated/execution.ts | 9 +- .../run-to-available-step-mapper.test.ts | 77 ++-------------- .../test/executors/base-step-executor.test.ts | 46 ++++------ .../load-related-record-step-executor.test.ts | 8 +- .../read-record-step-executor.test.ts | 92 ++++++++----------- ...rigger-record-action-step-executor.test.ts | 6 +- .../update-record-step-executor.test.ts | 6 +- 11 files changed, 96 insertions(+), 192 deletions(-) diff --git a/packages/workflow-executor/CLAUDE.md b/packages/workflow-executor/CLAUDE.md index 8d3fe66f90..1acc1e94f7 100644 --- a/packages/workflow-executor/CLAUDE.md +++ b/packages/workflow-executor/CLAUDE.md @@ -94,7 +94,7 @@ src/ - **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded values (display names) directly instead of invoking the AI. Each record step type has its own typed `preRecordedArgs` shape. Validation happens via schema resolution — invalid display names throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI. - **Graceful shutdown** — `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class. - **Structured log context** — `BaseStepExecutor.execute()` stamps every log line with a shared `logCtx` (`runId`, `stepId`, `stepIndex`, `stepType`). Executors with type-specific identifiers add them via the `getExtraLogContext()` hook (default `{}`), keeping the base class free of step-specific knowledge — e.g. `McpStepExecutor` returns `{ mcpServerId, mcpServerName }` so MCP step logs unambiguously identify the targeted server (`mcpServerId` is canonical; `mcpServerName` is the human-readable Record key, not guaranteed unique at the DB level). `mcpServerName` is resolved by `RemoteToolFetcher.fetch()` from the scoped config Record key and forwarded to the executor constructor. -- **Revision-aware history reads** — On revision the orchestrator (server-side) marks the pivot card `revised` and every later entry `cancelled`, then appends clones of the still-valid steps (`originalStepIndex` chains to the first original). Any consumer of `workflowHistory` must restrict to the live path (`!revised && !cancelled`) — skipping this is what leaks a superseded branch's context into a re-run. Resolve a step's RunStore execution through its `lineageStepIndexes`, never by `stepName` — LinkTo loops can legitimately put the same step name on the live path twice. +- **Revision-aware history reads** — On revision the orchestrator (server-side) marks the pivot card `revised` and every later entry `cancelled`, then appends clones of the still-valid steps (each clone's `originalStepIndex` points at the step it copies) plus a fresh re-execution of the revised step. Any consumer of `workflowHistory` must restrict to the live path (`!revised && !cancelled`) — skipping this leaks a superseded branch's context into a re-run. To find a step's RunStore execution, resolve own `stepIndex` first, then fall back to `originalStepIndex` (a clone the executor never ran inherits the copied step's record — mirrors the frontend's `carryForwardExecutorDataForCopiedSteps`). Own-index-first is essential: a re-executed step has its own entry, so it must never inherit the superseded original's record. Never key on `stepName` — LinkTo loops can put the same name on the live path twice. - **Boundary validation** — Types that cross a trust boundary (wire from the orchestrator, or mapper output) live under `src/types/validated/` as zod schemas with TS types inferred via `z.infer<>`. Strictness depends on origin: schemas the executor **produces** (mapper output) and **frontend** HTTP bodies use `.strict()` (catch our own bugs / input hygiene); the **orchestrator collection schema** instead **strips** unknown keys and requires only structural fields, with step-specific props optional and asserted at use-time by the consuming executor. This keeps the executor resilient to independent orchestrator drift — we fail at step execution, only when a step genuinely lacks what it needs, never in bulk up front for an unrelated add/remove. Validation runs where data enters (`forest-server-workflow-port.getCollectionSchema`, `run-to-available-step-mapper.toAvailableStepExecution`). On parse failure: throw `DomainValidationError` (extends `WorkflowExecutorError`) → bucketized as malformed (dispatch) or surfaced as a step error (execution). Types outside `validated/` are internal runtime state and not zod-validated. Note: `StepOutcome` is validated when it arrives as input via `previousSteps`; executor outputs are trusted by construction. ## Commands diff --git a/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts b/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts index b9c5ace323..ba90c1a671 100644 --- a/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts +++ b/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts @@ -76,12 +76,12 @@ function toStepOutcome(s: ServerStepHistory): StepOutcome { return { type: 'record', ...baseFromCtx, status } satisfies RecordStepOutcome; } -function tryMapStep(s: ServerStepHistory, lineageStepIndexes: [number, ...number[]]): Step | null { +function tryMapStep(s: ServerStepHistory): Step | null { try { return { stepDefinition: toStepDefinition(s.stepDefinition), stepOutcome: toStepOutcome(s), - lineageStepIndexes, + ...(s.originalStepIndex !== undefined && { originalStepIndex: s.originalStepIndex }), }; } catch (err) { // Sub-workflow navigation steps (start-sub-workflow, close-sub-workflow) are not @@ -91,22 +91,6 @@ function tryMapStep(s: ServerStepHistory, lineageStepIndexes: [number, ...number } } -// All generations of the same logical step share the same root (originalStepIndex chains to -// the FIRST original), so the lineage is rebuilt by grouping the FULL history — dead entries -// included, since they carry the intermediate generations. -function toLineageStepIndexes( - history: ServerStepHistory[], - step: ServerStepHistory, -): [number, ...number[]] { - const root = step.originalStepIndex ?? step.stepIndex; - const generations = history - .filter(s => (s.originalStepIndex ?? s.stepIndex) === root && s.stepIndex <= step.stepIndex) - .map(s => s.stepIndex) - .sort((a, b) => b - a); - - return generations as [number, ...number[]]; -} - // Mirrors the orchestrator's own read filter: revised and cancelled entries are not on the // live path. function toPreviousSteps( @@ -115,7 +99,7 @@ function toPreviousSteps( ): ReadonlyArray { return history .filter(s => s.done && !s.revised && !s.cancelled && s.stepIndex < pendingStepIndex) - .map(s => tryMapStep(s, toLineageStepIndexes(history, s))) + .map(s => tryMapStep(s)) .filter((s): s is Step => s !== null); } diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index 871b40423b..bed2cf4936 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -285,7 +285,7 @@ export default abstract class BaseStepExecutor { - const execution = BaseStepExecutor.resolveLineageExecution(step, allStepExecutions); + const execution = BaseStepExecutor.resolveStepExecution(step, allStepExecutions); return StepSummaryBuilder.build(step.stepDefinition, step.stepOutcome, execution); }) @@ -294,17 +294,19 @@ export default abstract class BaseStepExecutor e.stepIndex === step.stepOutcome.stepIndex); + if (own) return own; - for (const stepIndex of candidates) { - const execution = executions.find(e => e.stepIndex === stepIndex); - if (execution) return execution; + if (step.originalStepIndex !== undefined) { + return executions.find(e => e.stepIndex === step.originalStepIndex); } return undefined; diff --git a/packages/workflow-executor/src/executors/record-step-executor.ts b/packages/workflow-executor/src/executors/record-step-executor.ts index 926769cacc..cbeccc85f0 100644 --- a/packages/workflow-executor/src/executors/record-step-executor.ts +++ b/packages/workflow-executor/src/executors/record-step-executor.ts @@ -55,7 +55,7 @@ export default abstract class RecordStepExecutor< const relatedRecords = this.context.previousSteps.flatMap(step => { if (step.stepDefinition.type !== StepType.LoadRelatedRecord) return []; - const execution = BaseStepExecutor.resolveLineageExecution(step, stepExecutions); + const execution = BaseStepExecutor.resolveStepExecution(step, stepExecutions); if ( execution?.type === 'load-related-record' && diff --git a/packages/workflow-executor/src/types/validated/execution.ts b/packages/workflow-executor/src/types/validated/execution.ts index 2494c4d2c7..7d857ccf5d 100644 --- a/packages/workflow-executor/src/types/validated/execution.ts +++ b/packages/workflow-executor/src/types/validated/execution.ts @@ -25,10 +25,11 @@ export const StepSchema = z .object({ stepDefinition: StepDefinitionSchema, stepOutcome: StepOutcomeSchema, - // RunStore execution lookup candidates, own stepIndex first then earlier same-step - // generations descending. Revision clones run under a new stepIndex while their execution - // data stays keyed under the original one. Absent (legacy producers) means "own index only". - lineageStepIndexes: z.array(z.number().int().nonnegative()).nonempty().optional(), + // Set on a revision clone (a still-valid step the orchestrator re-injects); points at the + // step it copies. The executor never ran the clone, so its record lives at this index — + // mirrors the frontend's carryForwardExecutorDataForCopiedSteps. Absent for steps the + // executor ran itself. + originalStepIndex: z.number().int().nonnegative().optional(), }) .strict(); export type Step = z.infer; diff --git a/packages/workflow-executor/test/adapters/run-to-available-step-mapper.test.ts b/packages/workflow-executor/test/adapters/run-to-available-step-mapper.test.ts index 51e59a5670..dacf007274 100644 --- a/packages/workflow-executor/test/adapters/run-to-available-step-mapper.test.ts +++ b/packages/workflow-executor/test/adapters/run-to-available-step-mapper.test.ts @@ -597,8 +597,8 @@ describe('toAvailableStepExecution', () => { expect(result?.previousSteps).toEqual([]); }); - describe('lineageStepIndexes', () => { - it('is the own index alone for steps that were never revised', () => { + describe('originalStepIndex', () => { + it('is absent for a step the executor ran itself (no revision)', () => { const run = makeRun({ workflowHistory: [ makeStepHistory({ stepName: 's0', stepIndex: 0, done: true }), @@ -608,78 +608,21 @@ describe('toAvailableStepExecution', () => { const result = toAvailableStepExecution(run); - expect(result?.previousSteps).toEqual([ - expect.objectContaining({ lineageStepIndexes: [0] }), - ]); + expect(result?.previousSteps).toHaveLength(1); + expect(result?.previousSteps[0]).not.toHaveProperty('originalStepIndex'); }); - it('walks a clone back to its original through originalStepIndex', () => { + it('is set on a clone, pointing at the step it copies', () => { const result = toAvailableStepExecution(makeRevisedRun()); - expect(result?.previousSteps).toEqual([ - expect.objectContaining({ - stepOutcome: expect.objectContaining({ stepIndex: 0 }), - lineageStepIndexes: [0], - }), + // previousSteps = [trunk (ran, no original), card-b clone (copy of idx 1)] + expect(result?.previousSteps[0]).not.toHaveProperty('originalStepIndex'); + expect(result?.previousSteps[1]).toEqual( expect.objectContaining({ stepOutcome: expect.objectContaining({ stepIndex: 4 }), - lineageStepIndexes: [4, 1], - }), - ]); - }); - - it('includes every generation latest-first after a double revision', () => { - // idx 0: original (dead), idx 1: re-execution (dead, root 0), idx 2: clone of the - // re-execution (live, root 0 — originalStepIndex chains to the FIRST original). - // The intermediate generation at idx 1 is only discoverable via the dead entry. - const run = makeRun({ - workflowHistory: [ - makeStepHistory({ stepName: 'b1', stepIndex: 0, done: true, cancelled: true }), - makeClonedStepHistory({ stepName: 'b1', stepIndex: 1, done: true, cancelled: true }, 0), - makeClonedStepHistory({ stepName: 'b1', stepIndex: 2, done: true }, 0), - makeStepHistory({ stepName: 'next', stepIndex: 3, done: false }), - ], - }); - - const result = toAvailableStepExecution(run); - - expect(result?.previousSteps).toEqual([ - expect.objectContaining({ - stepOutcome: expect.objectContaining({ stepIndex: 2 }), - lineageStepIndexes: [2, 1, 0], - }), - ]); - }); - - it('keeps separate lineages for two live instances of the same stepName (LinkTo loop)', () => { - // Cycles are a product feature (designer LinkTo tool): the same definition node can - // legitimately execute twice on the live path. Lineage is instance identity, not - // definition identity — the two iterations must not share candidates. - const run = makeRun({ - workflowHistory: [ - makeStepHistory({ stepName: 'load-x', stepIndex: 0, done: true }), - makeStepHistory({ stepName: 'cond', stepIndex: 1, done: true }), - makeStepHistory({ stepName: 'load-x', stepIndex: 2, done: true }), - makeStepHistory({ stepName: 'next', stepIndex: 3, done: false }), - ], - }); - - const result = toAvailableStepExecution(run); - - expect(result?.previousSteps).toEqual([ - expect.objectContaining({ - stepOutcome: expect.objectContaining({ stepId: 'load-x', stepIndex: 0 }), - lineageStepIndexes: [0], - }), - expect.objectContaining({ - stepOutcome: expect.objectContaining({ stepId: 'cond', stepIndex: 1 }), - lineageStepIndexes: [1], - }), - expect.objectContaining({ - stepOutcome: expect.objectContaining({ stepId: 'load-x', stepIndex: 2 }), - lineageStepIndexes: [2], + originalStepIndex: 1, }), - ]); + ); }); }); }); diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 17d1b14851..0b210265f7 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -213,19 +213,19 @@ describe('BaseStepExecutor', () => { }); describe('previous-steps summary after revision', () => { - // Revision clones keep their stepName but get a NEW stepIndex, while the RunStore keys - // executions by the ORIGINAL index. The summary must resolve a step's execution through - // its lineageStepIndexes (own index first, then earlier generations) instead of an exact - // stepIndex match — otherwise clone summaries lose all execution detail. + // A revision clone never ran under its own (new) stepIndex — its execution lives at the + // copied step's index (originalStepIndex). The summary resolves own-index first, then the + // copied step; own-first is what stops a re-executed step from resurfacing the superseded + // original's execution detail. - function makeLineageEntry( + function makeCloneEntry( overrides: { stepId?: string; stepIndex?: number; prompt?: string }, - lineageStepIndexes: number[], + originalStepIndex: number, ): Step { - return { ...makeHistoryEntry(overrides), lineageStepIndexes }; + return { ...makeHistoryEntry(overrides), originalStepIndex }; } - it('resolves a clone summary through its lineage to the original execution', async () => { + it("resolves a clone summary from the copied step's execution", async () => { const runStore = makeMockRunStore([ { type: 'condition', @@ -236,9 +236,8 @@ describe('BaseStepExecutor', () => { ]); const executor = new TestableExecutor( makeContext({ - previousSteps: [ - makeLineageEntry({ stepId: 'cond-1', stepIndex: 7, prompt: 'Approve?' }, [7, 3]), - ], + // clone runs under idx 7 but copies idx 3; the executor never ran idx 7 + previousSteps: [makeCloneEntry({ stepId: 'cond-1', stepIndex: 7 }, 3)], runStore, }), ); @@ -250,26 +249,25 @@ describe('BaseStepExecutor', () => { expect(content).toContain('REASONING-AT-IDX-3'); }); - it('prefers the freshest lineage generation when several executions exist', async () => { + it("uses the step's own execution, never the copied original's, when both exist", async () => { const runStore = makeMockRunStore([ { type: 'condition', stepIndex: 3, - executionParams: { answer: 'No', reasoning: 'STALE-GENERATION' }, + executionParams: { answer: 'No', reasoning: 'SUPERSEDED-ORIGINAL' }, executionResult: { answer: 'No' }, }, { type: 'condition', stepIndex: 7, - executionParams: { answer: 'Yes', reasoning: 'FRESH-GENERATION' }, + executionParams: { answer: 'Yes', reasoning: 'OWN-FRESH' }, executionResult: { answer: 'Yes' }, }, ]); const executor = new TestableExecutor( makeContext({ - previousSteps: [ - makeLineageEntry({ stepId: 'cond-1', stepIndex: 10, prompt: 'Approve?' }, [10, 7, 3]), - ], + // a re-executed step carries originalStepIndex but has its OWN execution at idx 7 + previousSteps: [makeCloneEntry({ stepId: 'cond-1', stepIndex: 7 }, 3)], runStore, }), ); @@ -277,17 +275,15 @@ describe('BaseStepExecutor', () => { const messages = await executor.buildPreviousStepsMessages(); const content = messages[0].content as string; - expect(content).toContain('FRESH-GENERATION'); - expect(content).not.toContain('STALE-GENERATION'); + expect(content).toContain('OWN-FRESH'); + expect(content).not.toContain('SUPERSEDED-ORIGINAL'); }); - it('falls back to outcome history details when no lineage generation has an execution', async () => { + it('falls back to outcome History when neither own nor copied step has an execution', async () => { const runStore = makeMockRunStore([]); const executor = new TestableExecutor( makeContext({ - previousSteps: [ - makeLineageEntry({ stepId: 'cond-1', stepIndex: 7, prompt: 'Approve?' }, [7, 3]), - ], + previousSteps: [makeCloneEntry({ stepId: 'cond-1', stepIndex: 7 }, 3)], runStore, }), ); @@ -316,9 +312,7 @@ describe('BaseStepExecutor', () => { ]); const executor = new TestableExecutor( makeContext({ - previousSteps: [ - makeLineageEntry({ stepId: 'cond-1', stepIndex: 0, prompt: 'Approve?' }, [0]), - ], + previousSteps: [makeHistoryEntry({ stepId: 'cond-1', stepIndex: 0 })], runStore, }), ); diff --git a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts index bf7e7fc94e..e8097e85cf 100644 --- a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts @@ -197,7 +197,7 @@ function makePendingExecution( }; } -function makeLoadRelatedPreviousStep(stepIndex: number, lineageStepIndexes: number[]): Step { +function makeLoadRelatedPreviousStep(stepIndex: number, originalStepIndex?: number): Step { return { stepDefinition: { type: StepType.LoadRelatedRecord, @@ -210,7 +210,7 @@ function makeLoadRelatedPreviousStep(stepIndex: number, lineageStepIndexes: numb stepIndex, status: 'success', }, - lineageStepIndexes, + ...(originalStepIndex !== undefined && { originalStepIndex }), }; } @@ -2422,7 +2422,7 @@ describe('LoadRelatedRecordStepExecutor', () => { runStore, workflowPort, agentPort, - previousSteps: [makeLoadRelatedPreviousStep(2, [2])], + previousSteps: [makeLoadRelatedPreviousStep(2)], }); const executor = new LoadRelatedRecordStepExecutor(context); @@ -2739,7 +2739,7 @@ describe('LoadRelatedRecordStepExecutor', () => { model, runStore, workflowPort, - previousSteps: [makeLoadRelatedPreviousStep(2, [2]), makeLoadRelatedPreviousStep(3, [3])], + previousSteps: [makeLoadRelatedPreviousStep(2), makeLoadRelatedPreviousStep(3)], }); const executor = new LoadRelatedRecordStepExecutor(context); diff --git a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts index 40c9a5fd4a..e5d789bc2f 100644 --- a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts @@ -143,7 +143,7 @@ function makeContext( }; } -function makeLoadRelatedPreviousStep(stepIndex: number, lineageStepIndexes: number[]): Step { +function makeLoadRelatedPreviousStep(stepIndex: number, originalStepIndex?: number): Step { return { stepDefinition: { type: StepType.LoadRelatedRecord, @@ -156,7 +156,7 @@ function makeLoadRelatedPreviousStep(stepIndex: number, lineageStepIndexes: numb stepIndex, status: 'success', }, - lineageStepIndexes, + ...(originalStepIndex !== undefined && { originalStepIndex }), }; } @@ -431,7 +431,7 @@ describe('ReadRecordStepExecutor', () => { model, runStore, workflowPort, - previousSteps: [makeLoadRelatedPreviousStep(2, [2])], + previousSteps: [makeLoadRelatedPreviousStep(2)], }); const executor = new ReadRecordStepExecutor(context); @@ -531,7 +531,7 @@ describe('ReadRecordStepExecutor', () => { runStore, workflowPort, agentPort, - previousSteps: [makeLoadRelatedPreviousStep(2, [2])], + previousSteps: [makeLoadRelatedPreviousStep(2)], }); const executor = new ReadRecordStepExecutor(context); @@ -612,7 +612,7 @@ describe('ReadRecordStepExecutor', () => { model, runStore, workflowPort, - previousSteps: [makeLoadRelatedPreviousStep(5, [5])], + previousSteps: [makeLoadRelatedPreviousStep(5)], }), ); @@ -673,7 +673,7 @@ describe('ReadRecordStepExecutor', () => { model, runStore, workflowPort, - previousSteps: [makeLoadRelatedPreviousStep(1, [1])], + previousSteps: [makeLoadRelatedPreviousStep(1)], }); const executor = new ReadRecordStepExecutor(context); @@ -976,7 +976,7 @@ describe('ReadRecordStepExecutor', () => { const context = makeContext({ model: mockModel.model, runStore, - previousSteps: [makeLoadRelatedPreviousStep(1, [1])], + previousSteps: [makeLoadRelatedPreviousStep(1)], stepDefinition: makeStep({ preRecordedArgs: { selectedRecordStepIndex: 1 }, }), @@ -1055,10 +1055,10 @@ describe('ReadRecordStepExecutor', () => { }); describe('record pool after revision', () => { - // After a revision, records persisted in the RunStore by now-dead steps must leave the - // pool, while records produced by steps whose live representative is a CLONE (new - // stepIndex) must be resolved through the clone's lineageStepIndexes — the RunStore keys - // executions by the ORIGINAL index. + // After a revision, dead-branch records must leave the pool. A live clone (new stepIndex, + // never run by the executor) inherits its record from the copied step (originalStepIndex); + // a re-executed step uses its OWN entry and never resurfaces the superseded original's + // record — own-index-first resolution. function makeLoadRelatedExecution(stepIndex: number, recordId: number) { return { type: 'load-related-record', @@ -1111,9 +1111,9 @@ describe('ReadRecordStepExecutor', () => { ); }); - it('resolves a live clone record through its lineage to the original RunStore entry', async () => { - // Given: the live previous step is a clone at idx 7 whose execution (and record) was - // stored under the original idx 3. + it("resolves a clone's record from the copied step's original RunStore entry", async () => { + // Given: the live previous step is a clone at idx 7 (never run by the executor) that + // copies idx 3, where its execution and record are stored. const invoke = jest .fn() .mockResolvedValueOnce({ @@ -1141,7 +1141,7 @@ describe('ReadRecordStepExecutor', () => { customers: makeCollectionSchema(), orders: ordersSchema(), }), - previousSteps: [makeLoadRelatedPreviousStep(7, [7, 3])], + previousSteps: [makeLoadRelatedPreviousStep(7, 3)], }); const executor = new ReadRecordStepExecutor(context); @@ -1156,67 +1156,47 @@ describe('ReadRecordStepExecutor', () => { ); }); - it('offers only the freshest lineage generation after a double revision', async () => { - // Given: the same logical step executed twice (idx 3 then idx 7, stale → fresh); its - // live representative is a clone at idx 10 with lineage [10, 7, 3]. Only the freshest - // execution's record may enter the pool. - const invoke = jest - .fn() - .mockResolvedValueOnce({ - tool_calls: [ - { name: 'select-record', args: { recordIdentifier: 'Step 7 - Orders #77' }, id: 'c1' }, - ], - }) - .mockResolvedValueOnce({ - tool_calls: [ - { name: 'read-selected-record-fields', args: { fieldNames: ['total'] }, id: 'c2' }, - ], - }); - const bindTools = jest.fn().mockReturnValue({ invoke }); - const model = { bindTools } as unknown as ExecutionContext['model']; - + it('does not resurface the copied original record when the re-executed step has no record (Alban repro)', async () => { + // Given: a re-executed load step at idx 10 that copies idx 3 (originalStepIndex 3, where a + // record was loaded before the revision) but produced no record of its own (skipped / + // handled manually — its own entry has no executionResult.record). + const mockModel = makeMockModel({ fieldNames: ['email'] }); const runStore = makeMockRunStore({ getStepExecutions: jest .fn() - .mockResolvedValue([makeLoadRelatedExecution(3, 99), makeLoadRelatedExecution(7, 77)]), + .mockResolvedValue([ + makeLoadRelatedExecution(3, 99), + { type: 'load-related-record', stepIndex: 10, executionResult: { skipped: true } }, + ]), }); - const agentPort = makeMockAgentPort({ orders: { values: { total: 150 } } }); + const agentPort = makeMockAgentPort(); const context = makeContext({ - model, + model: mockModel.model, runStore, agentPort, - workflowPort: makeMockWorkflowPort({ - customers: makeCollectionSchema(), - orders: ordersSchema(), - }), - previousSteps: [makeLoadRelatedPreviousStep(10, [10, 7, 3])], + previousSteps: [makeLoadRelatedPreviousStep(10, 3)], }); const executor = new ReadRecordStepExecutor(context); // When const result = await executor.execute(); - // Then: the select-record tool offered exactly base + freshest — the stale idx-3 record - // is absent from the enum. - const selectTool = bindTools.mock.calls[0][0][0] as { - name: string; - schema: { shape: { recordIdentifier: { options: string[] } } }; - }; - expect(selectTool.name).toBe('select-record'); - expect(selectTool.schema.shape.recordIdentifier.options).toEqual([ - 'Step 0 - Customers #42', - 'Step 7 - Orders #77', - ]); + // Then: own-index-first resolves idx 10 (record-less), so the dead idx-3 record never + // enters the pool — it collapses to the base record, no select-record round. expect(result.stepOutcome.status).toBe('success'); + expect(mockModel.bindTools).toHaveBeenCalledTimes(1); + expect((mockModel.bindTools.mock.calls[0][0][0] as { name: string }).name).toBe( + 'read-selected-record-fields', + ); expect(agentPort.getRecord).toHaveBeenCalledWith( - expect.objectContaining({ collection: 'orders', id: [77] }), + expect.objectContaining({ collection: 'customers', id: [42] }), expect.objectContaining({ id: 1 }), ); }); it('offers both records when the same step ran twice on the live path (LinkTo loop)', async () => { // Given: a loop executed the same load step at idx 0 and idx 2 — two distinct live - // instances, two distinct records. Lineage is per instance; neither shadows the other. + // instances, two distinct records, each resolved from its own index; neither shadows the other. const invoke = jest .fn() .mockResolvedValueOnce({ @@ -1246,7 +1226,7 @@ describe('ReadRecordStepExecutor', () => { customers: makeCollectionSchema(), orders: ordersSchema(), }), - previousSteps: [makeLoadRelatedPreviousStep(0, [0]), makeLoadRelatedPreviousStep(2, [2])], + previousSteps: [makeLoadRelatedPreviousStep(0), makeLoadRelatedPreviousStep(2)], baseRecordRef: makeRecordRef({ stepIndex: 4 }), }); const executor = new ReadRecordStepExecutor(context); diff --git a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts index 6cf76ab478..5c4d417809 100644 --- a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts @@ -145,7 +145,7 @@ function makeContext( }; } -function makeLoadRelatedPreviousStep(stepIndex: number, lineageStepIndexes: number[]): Step { +function makeLoadRelatedPreviousStep(stepIndex: number, originalStepIndex?: number): Step { return { stepDefinition: { type: StepType.LoadRelatedRecord, @@ -158,7 +158,7 @@ function makeLoadRelatedPreviousStep(stepIndex: number, lineageStepIndexes: numb stepIndex, status: 'success', }, - lineageStepIndexes, + ...(originalStepIndex !== undefined && { originalStepIndex }), }; } @@ -794,7 +794,7 @@ describe('TriggerRecordActionStepExecutor', () => { runStore, workflowPort, agentPort, - previousSteps: [makeLoadRelatedPreviousStep(2, [2])], + previousSteps: [makeLoadRelatedPreviousStep(2)], }); const executor = new TriggerRecordActionStepExecutor(context); diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index 3e7ee83048..6b6cc4e759 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -142,7 +142,7 @@ function makeContext( }; } -function makeLoadRelatedPreviousStep(stepIndex: number, lineageStepIndexes: number[]): Step { +function makeLoadRelatedPreviousStep(stepIndex: number, originalStepIndex?: number): Step { return { stepDefinition: { type: StepType.LoadRelatedRecord, @@ -155,7 +155,7 @@ function makeLoadRelatedPreviousStep(stepIndex: number, lineageStepIndexes: numb stepIndex, status: 'success', }, - lineageStepIndexes, + ...(originalStepIndex !== undefined && { originalStepIndex }), }; } @@ -569,7 +569,7 @@ describe('UpdateRecordStepExecutor', () => { model, runStore, workflowPort, - previousSteps: [makeLoadRelatedPreviousStep(2, [2])], + previousSteps: [makeLoadRelatedPreviousStep(2)], }); const executor = new UpdateRecordStepExecutor(context); From e17962eafe8d3553296925e8f3fa55b7e93194a1 Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Thu, 4 Jun 2026 17:36:03 +0200 Subject: [PATCH 5/7] docs(workflow-executor): correct getAvailableRecordRefs comment to own-index-first Co-Authored-By: Claude Opus 4.8 --- .../workflow-executor/src/executors/record-step-executor.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/workflow-executor/src/executors/record-step-executor.ts b/packages/workflow-executor/src/executors/record-step-executor.ts index cbeccc85f0..79d7a55c58 100644 --- a/packages/workflow-executor/src/executors/record-step-executor.ts +++ b/packages/workflow-executor/src/executors/record-step-executor.ts @@ -48,8 +48,8 @@ export default abstract class RecordStepExecutor< } // Candidate sources for the AI: the base record plus the record each live prior - // load-related step resolved (via lineage — clones key their execution under the original - // stepIndex). + // load-related step resolved — own stepIndex first, falling back to a clone's + // originalStepIndex. protected async getAvailableRecordRefs(): Promise { const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); const relatedRecords = this.context.previousSteps.flatMap(step => { From 76f58fb9ca16f8597c1c22c74bb7c3f15136fa65 Mon Sep 17 00:00:00 2001 From: Brian Fox Date: Thu, 4 Jun 2026 18:01:29 +0200 Subject: [PATCH 6/7] docs(workflow-executor): make revision-resolution comments self-contained Drop frontend cross-references and the contested orchestrator-internal claim about re-executions carrying originalStepIndex; describe only what the executor uses the field for. Co-Authored-By: Claude Opus 4.8 --- packages/workflow-executor/src/adapters/server-types.ts | 2 +- .../workflow-executor/src/executors/base-step-executor.ts | 6 +++--- packages/workflow-executor/src/types/validated/execution.ts | 5 ++--- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/workflow-executor/src/adapters/server-types.ts b/packages/workflow-executor/src/adapters/server-types.ts index a334fa883a..f0d56a802a 100644 --- a/packages/workflow-executor/src/adapters/server-types.ts +++ b/packages/workflow-executor/src/adapters/server-types.ts @@ -169,7 +169,7 @@ export interface ServerStepHistory { done: boolean; revised?: boolean; cancelled?: boolean; - // Set on revision clones and re-executions; chains to the FIRST original across revisions. + // On a revision clone, the index of the step it copies — where that step's record lives. originalStepIndex?: number; context?: Record; childrenWorkflowId?: string; diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index bed2cf4936..90c9bf1f0c 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -295,9 +295,9 @@ export default abstract class BaseStepExecutor Date: Fri, 5 Jun 2026 15:52:23 +0200 Subject: [PATCH 7/7] refactor(workflow-executor): make resolveStepExecution an instance method Per PR review: resolveStepExecution was static. Since RecordStepExecutor extends BaseStepExecutor, make it a protected instance method and call it via `this.`. Pure helper, no behavior change. Co-Authored-By: Claude Opus 4.8 --- .../workflow-executor/src/executors/base-step-executor.ts | 4 ++-- .../workflow-executor/src/executors/record-step-executor.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index 90c9bf1f0c..3b15e2b914 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -285,7 +285,7 @@ export default abstract class BaseStepExecutor { - const execution = BaseStepExecutor.resolveStepExecution(step, allStepExecutions); + const execution = this.resolveStepExecution(step, allStepExecutions); return StepSummaryBuilder.build(step.stepDefinition, step.stepOutcome, execution); }) @@ -298,7 +298,7 @@ export default abstract class BaseStepExecutor { if (step.stepDefinition.type !== StepType.LoadRelatedRecord) return []; - const execution = BaseStepExecutor.resolveStepExecution(step, stepExecutions); + const execution = this.resolveStepExecution(step, stepExecutions); if ( execution?.type === 'load-related-record' &&