Skip to content
1 change: 1 addition & 0 deletions packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ src/
- **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded values (display names) directly instead of invoking the AI. Each record step type has its own typed `preRecordedArgs` shape. Validation happens via schema resolution — invalid display names throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI.
- **Graceful shutdown** — `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class.
- **Structured log context** — `BaseStepExecutor.execute()` stamps every log line with a shared `logCtx` (`runId`, `stepId`, `stepIndex`, `stepType`). Executors with type-specific identifiers add them via the `getExtraLogContext()` hook (default `{}`), keeping the base class free of step-specific knowledge — e.g. `McpStepExecutor` returns `{ mcpServerId, mcpServerName }` so MCP step logs unambiguously identify the targeted server (`mcpServerId` is canonical; `mcpServerName` is the human-readable Record key, not guaranteed unique at the DB level). `mcpServerName` is resolved by `RemoteToolFetcher.fetch()` from the scoped config Record key and forwarded to the executor constructor.
- **Revision-aware history reads** — On revision the orchestrator (server-side) marks the pivot card `revised` and every later entry `cancelled`, then appends clones of the still-valid steps (each clone's `originalStepIndex` points at the step it copies) plus a fresh re-execution of the revised step. Any consumer of `workflowHistory` must restrict to the live path (`!revised && !cancelled`) — skipping this leaks a superseded branch's context into a re-run. To find a step's RunStore execution, resolve own `stepIndex` first, then fall back to `originalStepIndex` (a clone the executor never ran inherits the copied step's record — mirrors the frontend's `carryForwardExecutorDataForCopiedSteps`). Own-index-first is essential: a re-executed step has its own entry, so it must never inherit the superseded original's record. Never key on `stepName` — LinkTo loops can put the same name on the live path twice.
- **Boundary validation** — Types that cross a trust boundary (wire from the orchestrator, or mapper output) live under `src/types/validated/` as zod schemas with TS types inferred via `z.infer<>`. Strictness depends on origin: schemas the executor **produces** (mapper output) and **frontend** HTTP bodies use `.strict()` (catch our own bugs / input hygiene); the **orchestrator collection schema** instead **strips** unknown keys and requires only structural fields, with step-specific props optional and asserted at use-time by the consuming executor. This keeps the executor resilient to independent orchestrator drift — we fail at step execution, only when a step genuinely lacks what it needs, never in bulk up front for an unrelated add/remove. Validation runs where data enters (`forest-server-workflow-port.getCollectionSchema`, `run-to-available-step-mapper.toAvailableStepExecution`). On parse failure: throw `DomainValidationError` (extends `WorkflowExecutorError`) → bucketized as malformed (dispatch) or surfaced as a step error (execution). Types outside `validated/` are internal runtime state and not zod-validated. Note: `StepOutcome` is validated when it arrives as input via `previousSteps`; executor outputs are trusted by construction.

## Commands
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ function tryMapStep(s: ServerStepHistory): Step | null {
return {
stepDefinition: toStepDefinition(s.stepDefinition),
stepOutcome: toStepOutcome(s),
...(s.originalStepIndex !== undefined && { originalStepIndex: s.originalStepIndex }),
};
} catch (err) {
// Sub-workflow navigation steps (start-sub-workflow, close-sub-workflow) are not
Expand All @@ -90,12 +91,14 @@ function tryMapStep(s: ServerStepHistory): Step | null {
}
}

// Mirrors the orchestrator's own read filter: revised and cancelled entries are not on the
// live path.
function toPreviousSteps(
history: ServerStepHistory[],
pendingStepIndex: number,
): ReadonlyArray<Step> {
return history
.filter(s => s.done && s.stepIndex < pendingStepIndex)
.filter(s => s.done && !s.revised && !s.cancelled && s.stepIndex < pendingStepIndex)
.map(s => tryMapStep(s))
.filter((s): s is Step => s !== null);
}
Expand Down
2 changes: 2 additions & 0 deletions packages/workflow-executor/src/adapters/server-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ export interface ServerStepHistory {
done: boolean;
revised?: boolean;
cancelled?: boolean;
// On a revision clone, the index of the step it copies — where that step's record lives.
originalStepIndex?: number;
context?: Record<string, unknown>;
childrenWorkflowId?: string;
stepDefinition: ServerWorkflowStep;
Expand Down
25 changes: 22 additions & 3 deletions packages/workflow-executor/src/executors/base-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type {
StepExecutionResult,
} from '../types/execution-context';
import type { ConfirmableStepExecutionData, StepExecutionData } from '../types/step-execution-data';
import type { Step } from '../types/validated/execution';
import type { StepDefinition } from '../types/validated/step-definition';
import type { StepStatus } from '../types/validated/step-outcome';
import type {
Expand Down Expand Up @@ -283,16 +284,34 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St

const allStepExecutions = await this.context.runStore.getStepExecutions(this.context.runId);
const summary = this.context.previousSteps
.map(({ stepDefinition, stepOutcome }) => {
const execution = allStepExecutions.find(e => e.stepIndex === stepOutcome.stepIndex);
.map(step => {
const execution = this.resolveStepExecution(step, allStepExecutions);

return StepSummaryBuilder.build(stepDefinition, stepOutcome, execution);
return StepSummaryBuilder.build(step.stepDefinition, step.stepOutcome, execution);
})
.join('\n\n');

return [new SystemMessage(summary)];
}

// A step the executor ran has its execution at its own stepIndex. A revision clone never ran,
// so it inherits the record of the step it copied (originalStepIndex). Own-index-first is what
// stops a re-executed step (which has its own entry) from resurfacing the superseded
// original's record.
protected resolveStepExecution(
step: Step,
executions: StepExecutionData[],
): StepExecutionData | undefined {
const own = executions.find(e => e.stepIndex === step.stepOutcome.stepIndex);
if (own) return own;

if (step.originalStepIndex !== undefined) {
return executions.find(e => e.stepIndex === step.originalStepIndex);
}

return undefined;
}

private static mergeLeadingSystemMessages(messages: BaseMessage[]): BaseMessage[] {
let i = 0;
while (i < messages.length && messages[i] instanceof SystemMessage) i += 1;
Expand Down
18 changes: 13 additions & 5 deletions packages/workflow-executor/src/executors/record-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { z } from 'zod';

import { InvalidAIResponseError, InvalidPreRecordedArgsError, NoRecordsError } from '../errors';
import BaseStepExecutor from './base-step-executor';
import { StepType } from '../types/validated/step-definition';

export default abstract class RecordStepExecutor<
TStep extends StepDefinition = StepDefinition,
Expand Down Expand Up @@ -46,15 +47,22 @@ export default abstract class RecordStepExecutor<
return this.selectRecordRef(records, prompt);
}

// Candidate sources for the AI: the base record plus the record each live prior
// load-related step resolved — own stepIndex first, falling back to a clone's
// originalStepIndex.
protected async getAvailableRecordRefs(): Promise<RecordRef[]> {
const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId);
const relatedRecords = stepExecutions.flatMap(e => {
const relatedRecords = this.context.previousSteps.flatMap(step => {
if (step.stepDefinition.type !== StepType.LoadRelatedRecord) return [];
Comment thread
hercemer42 marked this conversation as resolved.

const execution = this.resolveStepExecution(step, stepExecutions);

if (
e.type === 'load-related-record' &&
e.executionResult !== undefined &&
'record' in e.executionResult
execution?.type === 'load-related-record' &&
execution.executionResult !== undefined &&
'record' in execution.executionResult
) {
return [e.executionResult.record];
return [execution.executionResult.record];
}

return [];
Expand Down
4 changes: 4 additions & 0 deletions packages/workflow-executor/src/types/validated/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ export const StepSchema = z
.object({
stepDefinition: StepDefinitionSchema,
stepOutcome: StepOutcomeSchema,
// Set on a revision clone (a still-valid step the orchestrator re-injects); points at the
// step it copies. The executor never ran the clone, so its record lives at that index.
// Absent for steps the executor ran itself.
originalStepIndex: z.number().int().nonnegative().optional(),
})
.strict();
export type Step = z.infer<typeof StepSchema>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,132 @@ describe('toAvailableStepExecution', () => {
});
});

describe('revision handling', () => {
// Revision model (orchestrator): the pivot card is stamped revised:true, every step after
// it is stamped cancelled:true, and clones of the still-valid card sub-steps are appended
// at the tail with originalStepIndex chaining to the FIRST original. The live path is
// filter(!revised && !cancelled).

function makeClonedStepHistory(
overrides: Partial<ServerStepHistory>,
originalStepIndex: number,
): ServerStepHistory {
return { ...makeStepHistory(overrides), originalStepIndex };
}

/**
* Canonical single-revision scenario (sub B1 of card B revised):
* idx 0 trunk task done ← live
* idx 1 card B done, revised ← pivot anchor (dead)
* idx 2 sub B1 done, cancelled ← dead branch
* idx 3 card C done, cancelled ← dead branch
* idx 4 card B clone done, originalStepIndex 1 ← live
* idx 5 sub B1 re-exec pending ← current
*/
function makeRevisedRun(): ServerHydratedWorkflowRun {
return makeRun({
workflowHistory: [
makeStepHistory({ stepName: 'trunk', stepIndex: 0, done: true }),
makeStepHistory({ stepName: 'card-b', stepIndex: 1, done: true, revised: true }),
makeStepHistory({ stepName: 'sub-b1', stepIndex: 2, done: true, cancelled: true }),
makeStepHistory({ stepName: 'card-c', stepIndex: 3, done: true, cancelled: true }),
makeClonedStepHistory({ stepName: 'card-b', stepIndex: 4, done: true }, 1),
makeClonedStepHistory({ stepName: 'sub-b1', stepIndex: 5, done: false }, 2),
],
});
}

it('excludes cancelled steps from previousSteps', () => {
const run = makeRun({
workflowHistory: [
makeStepHistory({ stepName: 's0', stepIndex: 0, done: true }),
makeStepHistory({ stepName: 's1', stepIndex: 1, done: true, cancelled: true }),
makeStepHistory({ stepName: 's2', stepIndex: 2, done: false }),
],
});

const result = toAvailableStepExecution(run);

expect(result?.previousSteps).toHaveLength(1);
expect(result?.previousSteps[0].stepOutcome.stepId).toBe('s0');
});

it('excludes the revised anchor while keeping its live clone', () => {
const result = toAvailableStepExecution(makeRevisedRun());

const indexes = result?.previousSteps.map(s => s.stepOutcome.stepIndex);
expect(indexes).toEqual([0, 4]);
});

it('returns empty previousSteps when the entry point is revised', () => {
// Entry-point revision: the pivot IS the first step, so nothing valid precedes the
// re-execution — context must collapse to a clean slate.
const run = makeRun({
workflowHistory: [
makeStepHistory({ stepName: 'entry', stepIndex: 0, done: true, revised: true }),
makeStepHistory({ stepName: 's1', stepIndex: 1, done: true, cancelled: true }),
makeClonedStepHistory({ stepName: 'entry', stepIndex: 2, done: false }, 0),
],
});

const result = toAvailableStepExecution(run);

expect(result?.previousSteps).toEqual([]);
});

it('does not attempt to map dead-branch steps (filtering precedes mapping)', () => {
// A cancelled step with an unmappable definition must not fail the run — it is dead.
const run = makeRun({
workflowHistory: [
makeStepHistory({
stepName: 's0',
stepIndex: 0,
done: true,
cancelled: true,
stepDefinition: makeTaskStepDef({
taskType: 'unknown-future-type' as ServerTaskTypeEnum,
title: 't',
}),
}),
makeStepHistory({ stepName: 's1', stepIndex: 1, done: false }),
],
});

const result = toAvailableStepExecution(run);

expect(result?.previousSteps).toEqual([]);
});

describe('originalStepIndex', () => {
it('is absent for a step the executor ran itself (no revision)', () => {
const run = makeRun({
workflowHistory: [
makeStepHistory({ stepName: 's0', stepIndex: 0, done: true }),
makeStepHistory({ stepName: 's1', stepIndex: 1, done: false }),
],
});

const result = toAvailableStepExecution(run);

expect(result?.previousSteps).toHaveLength(1);
expect(result?.previousSteps[0]).not.toHaveProperty('originalStepIndex');
});

it('is set on a clone, pointing at the step it copies', () => {
const result = toAvailableStepExecution(makeRevisedRun());

// previousSteps = [trunk (ran, no original), card-b clone (copy of idx 1)]
expect(result?.previousSteps[0]).not.toHaveProperty('originalStepIndex');
expect(result?.previousSteps[1]).toEqual(
expect.objectContaining({
stepOutcome: expect.objectContaining({ stepIndex: 4 }),
originalStepIndex: 1,
}),
);
});
});
});

describe('user mapping', () => {
it('should map server userProfile to StepUser with null → empty string', () => {
const profile: ServerUserProfile = {
Expand Down
114 changes: 114 additions & 0 deletions packages/workflow-executor/test/executors/base-step-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { RunStore } from '../../src/ports/run-store';
import type { ExecutionContext, StepExecutionResult } from '../../src/types/execution-context';
import type { StepExecutionData } from '../../src/types/step-execution-data';
import type { RecordRef } from '../../src/types/validated/collection';
import type { Step } from '../../src/types/validated/execution';
import type { StepDefinition } from '../../src/types/validated/step-definition';
import type { BaseStepStatus, StepOutcome } from '../../src/types/validated/step-outcome';
import type { BaseMessage, DynamicStructuredTool } from '@forestadmin/ai-proxy';
Expand Down Expand Up @@ -211,6 +212,119 @@ describe('BaseStepExecutor', () => {
});
});

describe('previous-steps summary after revision', () => {
// A revision clone never ran under its own (new) stepIndex — its execution lives at the
// copied step's index (originalStepIndex). The summary resolves own-index first, then the
// copied step; own-first is what stops a re-executed step from resurfacing the superseded
// original's execution detail.

function makeCloneEntry(
overrides: { stepId?: string; stepIndex?: number; prompt?: string },
originalStepIndex: number,
): Step {
return { ...makeHistoryEntry(overrides), originalStepIndex };
}

it("resolves a clone summary from the copied step's execution", async () => {
const runStore = makeMockRunStore([
{
type: 'condition',
stepIndex: 3,
executionParams: { answer: 'Yes', reasoning: 'REASONING-AT-IDX-3' },
executionResult: { answer: 'Yes' },
},
]);
const executor = new TestableExecutor(
makeContext({
// clone runs under idx 7 but copies idx 3; the executor never ran idx 7
previousSteps: [makeCloneEntry({ stepId: 'cond-1', stepIndex: 7 }, 3)],
runStore,
}),
);

const messages = await executor.buildPreviousStepsMessages();
const content = messages[0].content as string;

expect(content).toContain('Step "cond-1"');
expect(content).toContain('REASONING-AT-IDX-3');
});

it("uses the step's own execution, never the copied original's, when both exist", async () => {
const runStore = makeMockRunStore([
{
type: 'condition',
stepIndex: 3,
executionParams: { answer: 'No', reasoning: 'SUPERSEDED-ORIGINAL' },
executionResult: { answer: 'No' },
},
{
type: 'condition',
stepIndex: 7,
executionParams: { answer: 'Yes', reasoning: 'OWN-FRESH' },
executionResult: { answer: 'Yes' },
},
]);
const executor = new TestableExecutor(
makeContext({
// a re-executed step carries originalStepIndex but has its OWN execution at idx 7
previousSteps: [makeCloneEntry({ stepId: 'cond-1', stepIndex: 7 }, 3)],
runStore,
}),
);

const messages = await executor.buildPreviousStepsMessages();
const content = messages[0].content as string;

expect(content).toContain('OWN-FRESH');
expect(content).not.toContain('SUPERSEDED-ORIGINAL');
});

it('falls back to outcome History when neither own nor copied step has an execution', async () => {
const runStore = makeMockRunStore([]);
const executor = new TestableExecutor(
makeContext({
previousSteps: [makeCloneEntry({ stepId: 'cond-1', stepIndex: 7 }, 3)],
runStore,
}),
);

const messages = await executor.buildPreviousStepsMessages();
const content = messages[0].content as string;

expect(content).toContain('Step "cond-1"');
expect(content).toContain('History:');
});

it('does not surface executions of steps absent from previousSteps (dead branch)', async () => {
const runStore = makeMockRunStore([
{
type: 'condition',
stepIndex: 0,
executionParams: { answer: 'Yes', reasoning: 'LIVE-INPUT' },
executionResult: { answer: 'Yes' },
},
{
type: 'condition',
stepIndex: 5,
executionParams: { answer: 'No', reasoning: 'DEAD-INPUT' },
executionResult: { answer: 'No' },
},
]);
const executor = new TestableExecutor(
makeContext({
previousSteps: [makeHistoryEntry({ stepId: 'cond-1', stepIndex: 0 })],
runStore,
}),
);

const messages = await executor.buildPreviousStepsMessages();
const content = messages[0].content as string;

expect(content).toContain('LIVE-INPUT');
expect(content).not.toContain('DEAD-INPUT');
});
});

describe('execute error handling', () => {
it('converts NoRecordsError to error outcome', async () => {
const executor = new TestableExecutor(makeContext(), new NoRecordsError());
Expand Down
Loading
Loading