Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ src/
- **Idempotency in mutating executors** — `update-record`, `trigger-action`, and `mcp` executors protect against duplicate side effects via a write-ahead log in the `RunStore`. Before the side effect fires, the executor saves `idempotencyPhase: 'executing'`. After, it saves `idempotencyPhase: 'done'` alongside the normal `executionResult`. On re-dispatch (same `runId + stepIndex`): `done` → reconstruct success outcome via `buildOutcomeResult` without re-executing or emitting an activity log; `executing` → throw `StepStateError` (user retries manually, also no activity log). The `checkIdempotency()` hook in `BaseStepExecutor` is called before `runWithActivityLog()` so neither cache hits nor uncertain-state errors emit activity log entries. Non-mutating executors (`condition`, `read-record`, `guidance`, `load-related-record`) do not override `checkIdempotency()` — replaying them is safe.
- **Fetched steps must be executed** — Any step retrieved from the orchestrator via `getAvailableRuns()` must be executed. Silently discarding a fetched step (e.g. filtering it out by `runId` after fetching) violates the executor contract: the orchestrator assumes execution is guaranteed once the step is dispatched. The only valid filter before executing is deduplication via `inFlightRuns` (keyed by `runId`, to avoid running the same run twice concurrently; the key is the run, not the step, because a chain advances the `stepId` between iterations).
- **Auto-chain from `/update-step` response** — `WorkflowPort.updateStepExecution` returns `AvailableRunDispatch | null`: when non-null, the `Runner` executes the next step inline instead of waiting for the next poll. The chain exits on `null` (awaiting-input / finished / error), on a non-progressing `stepIndex` (server bug defense), at `maxChainDepth` (config, default 50), or when `stop()` is called. Each chained step uses the `forestServerToken` from its own dispatch — token freshness is preserved across the chain. The port retries `POST /update-step` on transient failures (network, 5xx) — this relies on server-side idempotency: the orchestrator MUST deduplicate identical outcomes for a given `(runId, stepIndex)` to prevent double side-effects on retry.
- **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded values (display names) directly instead of invoking the AI. Each record step type has its own typed `preRecordedArgs` shape. Validation happens via schema resolution — invalid display names throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI.
- **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded **technical names** (`fieldName`/`fieldNames`/`actionName`/`relationName`) directly instead of invoking the AI — the orchestrator→executor wire references fields/relations/actions by their stable technical name, never by the mutable, non-unique `displayName`. The `displayName` persisted in the RunStore is always resolved from the live schema at execution time (still persisted for the AI and for the front — see "displayName in AI tools"). Technical names are matched exactly against the schema (`findFieldByTechnicalName` / the exact action lookup) — the displayName + fuzzy tolerances of `findField` are reserved for AI-returned names, so a technical name can't resolve to a different field whose displayName collides. Each record step type has its own typed `preRecordedArgs` shape. An unresolvable name throws `FieldNotFoundError` / `ActionNotFoundError` / `RelationNotFoundError` (read-record instead throws `NoResolvedFieldsError`, only when *no* field resolves — individual misses are surfaced per-field). Malformed arg shapes — e.g. `fieldName` without `value`, or an out-of-range `selectedRecordStepIndex` — throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI.
- **Graceful shutdown** — `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class.
- **Structured log context** — `BaseStepExecutor.execute()` stamps every log line with a shared `logCtx` (`runId`, `stepId`, `stepIndex`, `stepType`). Executors with type-specific identifiers add them via the `getExtraLogContext()` hook (default `{}`), keeping the base class free of step-specific knowledge — e.g. `McpStepExecutor` returns `{ mcpServerId, mcpServerName }` so MCP step logs unambiguously identify the targeted server (`mcpServerId` is canonical; `mcpServerName` is the human-readable Record key, not guaranteed unique at the DB level). `mcpServerName` is resolved by `RemoteToolFetcher.fetch()` from the scoped config Record key and forwarded to the executor constructor.
- **Boundary validation** — Types that cross a trust boundary (wire from the orchestrator, or mapper output) live under `src/types/validated/` as zod schemas with TS types inferred via `z.infer<>`. Strictness depends on origin: schemas the executor **produces** (mapper output) and **frontend** HTTP bodies use `.strict()` (catch our own bugs / input hygiene); the **orchestrator collection schema** instead **strips** unknown keys and requires only structural fields, with step-specific props optional and asserted at use-time by the consuming executor. This keeps the executor resilient to independent orchestrator drift — we fail at step execution, only when a step genuinely lacks what it needs, never in bulk up front for an unrelated add/remove. Validation runs where data enters (`forest-server-workflow-port.getCollectionSchema`, `run-to-available-step-mapper.toAvailableStepExecution`). On parse failure: throw `DomainValidationError` (extends `WorkflowExecutorError`) → bucketized as malformed (dispatch) or surfaced as a step error (execution). Types outside `validated/` are internal runtime state and not zod-validated. Note: `StepOutcome` is validated when it arrives as input via `previousSteps`; executor outputs are trusted by construction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<Lo
preRecordedArgs?.selectedRecordStepIndex,
);
const schema = await this.getCollectionSchema(selectedRecordRef.collectionName);
const args = preRecordedArgs?.relationDisplayName
? { relationName: preRecordedArgs.relationDisplayName }
: await this.selectRelation(schema, step.prompt);
const target = this.buildTarget(schema, args.relationName, selectedRecordRef);
const recordedRelation = preRecordedArgs?.relationName;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preRecordedArgs will be very useful for run-to-build move, when the orchestrator wants to inject inputs instead to use ai.

const relationName =
recordedRelation ?? (await this.selectRelation(schema, step.prompt)).relationName;
const target = this.buildTarget(schema, relationName, selectedRecordRef);

// Branch B -- fully automated execution
if (step.executionType === StepExecutionMode.FullyAutomated) {
Expand All @@ -143,7 +143,7 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<Lo
relationName: string,
selectedRecordRef: RecordRef,
): RelationTarget {
const field = this.findField(schema, relationName);
const field = this.findFieldByTechnicalName(schema, relationName);

if (!field) {
throw new RelationNotFoundError(relationName, schema.collectionName);
Expand Down Expand Up @@ -469,7 +469,7 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<Lo
private async selectRelation(
schema: CollectionSchema,
prompt: string | undefined,
): Promise<{ relationName: string; reasoning: string }> {
): Promise<{ relationName: string }> {
const tool = this.buildSelectRelationTool(schema);
const messages = [
this.buildContextMessage(),
Expand All @@ -481,7 +481,12 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<Lo
new HumanMessage(`**Request**: ${prompt ?? 'Load the relevant related record.'}`),
];

return this.invokeWithTool<{ relationName: string; reasoning: string }>(messages, tool);
const { relationName } = await this.invokeWithTool<{ relationName: string; reasoning: string }>(
messages,
tool,
);

return { relationName: this.resolveAiFieldName(schema, relationName) };
}

private buildSelectRelationTool(schema: CollectionSchema): DynamicStructuredTool {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { CreateActivityLogArgs } from '../ports/activity-log-port';
import type { StepExecutionResult } from '../types/execution-context';
import type { FieldReadResult } from '../types/step-execution-data';
import type { CollectionSchema } from '../types/validated/collection';
import type { CollectionSchema, FieldSchema } from '../types/validated/collection';
import type { ReadRecordStepDefinition } from '../types/validated/step-definition';

import { DynamicStructuredTool, HumanMessage, SystemMessage } from '@forestadmin/ai-proxy';
Expand Down Expand Up @@ -40,14 +40,19 @@ export default class ReadRecordStepExecutor extends RecordStepExecutor<ReadRecor
preRecordedArgs?.selectedRecordStepIndex,
);
const schema = await this.getCollectionSchema(selectedRecordRef.collectionName);
const selectedDisplayNames =
preRecordedArgs?.fieldDisplayNames ?? (await this.selectFields(schema, step.prompt));
const resolvedFieldNames = selectedDisplayNames
.map(name => this.findField(schema, name)?.fieldName)
const fieldNames =
preRecordedArgs?.fieldNames ?? (await this.selectFields(schema, step.prompt));
const selectedFields = fieldNames.map(requested => ({
requested,
field: this.findFieldByTechnicalName(schema, requested),
}));

const resolvedFieldNames = selectedFields
.map(s => s.field?.fieldName)
.filter((name): name is string => name !== undefined);

if (resolvedFieldNames.length === 0) {
throw new NoResolvedFieldsError(selectedDisplayNames);
throw new NoResolvedFieldsError(selectedFields.map(s => s.requested));
}

const recordData = await this.agentPort.getRecord(
Expand All @@ -58,7 +63,7 @@ export default class ReadRecordStepExecutor extends RecordStepExecutor<ReadRecor
},
this.context.user,
);
const fieldResults = this.formatFieldResults(recordData.values, schema, selectedDisplayNames);
const fieldResults = this.formatFieldResults(recordData.values, selectedFields);

await this.context.runStore.saveStepExecution(this.context.runId, {
type: 'read-record',
Expand Down Expand Up @@ -88,9 +93,9 @@ export default class ReadRecordStepExecutor extends RecordStepExecutor<ReadRecor
new HumanMessage(`**Request**: ${prompt ?? 'Read the relevant fields.'}`),
];

const args = await this.invokeWithTool<{ fieldNames: string[] }>(messages, tool);
const { fieldNames } = await this.invokeWithTool<{ fieldNames: string[] }>(messages, tool);

return args.fieldNames;
return fieldNames.map(name => this.resolveAiFieldName(schema, name));
}

private buildReadFieldTool(schema: CollectionSchema): DynamicStructuredTool {
Expand Down Expand Up @@ -123,13 +128,12 @@ export default class ReadRecordStepExecutor extends RecordStepExecutor<ReadRecor

private formatFieldResults(
values: Record<string, unknown>,
schema: CollectionSchema,
fieldDisplayNames: string[],
selected: Array<{ requested: string; field: FieldSchema | undefined }>,
): FieldReadResult[] {
return fieldDisplayNames.map(name => {
const field = this.findField(schema, name);

if (!field) return { error: `Field not found: ${name}`, name, displayName: name };
return selected.map(({ requested, field }) => {
if (!field) {
return { error: `Field not found: ${requested}`, name: requested, displayName: requested };
}

return {
value: values[field.fieldName],
Expand Down
30 changes: 18 additions & 12 deletions packages/workflow-executor/src/executors/record-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,32 @@ export default abstract class RecordStepExecutor<
return schema;
}

protected findField(schema: CollectionSchema, name: string): FieldSchema | undefined {
// LLMs occasionally return formatting variants of field names (e.g. "first_name" for
// "firstname", "full-name" for "Full Name") even though the tool schema declares them
// as literals. Fall back to a normalized comparison so a cosmetic variation doesn't
// fail an otherwise correct step.
const normalizeFieldName = (s: string) => s.toLowerCase().replace(/[\s_-]/g, '');
const normalized = normalizeFieldName(name);
protected findFieldByTechnicalName(
schema: CollectionSchema,
name: string | undefined,
): FieldSchema | undefined {
if (name === undefined) return undefined;

return schema.fields.find(f => f.fieldName === name);
}

// Map an AI-returned displayName back to its technical fieldName. LLMs occasionally return
// formatting variants (e.g. "first_name" for "firstname", "full-name" for "Full Name"), so fall
// back to a normalized comparison. On a miss, returns the raw name — the exact lookup downstream
// turns it into a loud error.
protected resolveAiFieldName(schema: CollectionSchema, name: string): string {
const exact =
schema.fields.find(f => f.displayName === name) ??
schema.fields.find(f => f.fieldName === name);
if (exact) return exact;
if (exact) return exact.fieldName;

const normalize = (s: string) => s.toLowerCase().replace(/[\s_-]/g, '');
const normalized = normalize(name);
const fuzzy = schema.fields.filter(
f =>
normalizeFieldName(f.displayName) === normalized ||
normalizeFieldName(f.fieldName) === normalized,
f => normalize(f.displayName) === normalized || normalize(f.fieldName) === normalized,
);

return fuzzy.length === 1 ? fuzzy[0] : undefined;
return fuzzy.length === 1 ? fuzzy[0].fieldName : name;
}

private async toRecordIdentifier(record: RecordRef): Promise<string> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { CreateActivityLogArgs } from '../ports/activity-log-port';
import type { StepExecutionResult } from '../types/execution-context';
import type { ActionRef, TriggerRecordActionStepExecutionData } from '../types/step-execution-data';
import type { CollectionSchema, RecordRef } from '../types/validated/collection';
import type { ActionSchema, CollectionSchema, RecordRef } from '../types/validated/collection';
import type { TriggerActionStepDefinition } from '../types/validated/step-definition';

import { DynamicStructuredTool, HumanMessage, SystemMessage } from '@forestadmin/ai-proxy';
Expand Down Expand Up @@ -108,11 +108,19 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<
preRecordedArgs?.selectedRecordStepIndex,
);
const schema = await this.getCollectionSchema(selectedRecordRef.collectionName);
const args = preRecordedArgs?.actionDisplayName
? { actionName: preRecordedArgs.actionDisplayName }
: await this.selectAction(schema, step.prompt);
const name = this.resolveActionName(schema, args.actionName);
const target: ActionTarget = { selectedRecordRef, displayName: args.actionName, name };
const recordedAction = preRecordedArgs?.actionName;
const actionName = recordedAction ?? (await this.selectAction(schema, step.prompt)).actionName;
const action = this.findActionByTechnicalName(schema, actionName);

if (!action) {
throw new ActionNotFoundError(actionName, schema.collectionName);
}

const target: ActionTarget = {
selectedRecordRef,
displayName: action.displayName,
name: action.name,
};

// Branch B -- fully automated: executor runs the action itself, so it cannot
// handle forms (no UI to fill them). Reject form-bearing actions here. When the
Expand All @@ -121,7 +129,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<
const { hasForm } = await this.agentPort.getActionFormInfo(
{
collection: selectedRecordRef.collectionName,
action: name,
action: target.name,
id: selectedRecordRef.recordId,
},
this.context.user,
Expand Down Expand Up @@ -197,7 +205,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<
private async selectAction(
schema: CollectionSchema,
prompt: string | undefined,
): Promise<{ actionName: string; reasoning: string }> {
): Promise<{ actionName: string }> {
const tool = this.buildSelectActionTool(schema);
const messages = [
this.buildContextMessage(),
Expand All @@ -209,7 +217,12 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<
new HumanMessage(`**Request**: ${prompt ?? 'Trigger the relevant action.'}`),
];

return this.invokeWithTool<{ actionName: string; reasoning: string }>(messages, tool);
const { actionName } = await this.invokeWithTool<{ actionName: string; reasoning: string }>(
messages,
tool,
);

return { actionName: this.findAction(schema, actionName)?.name ?? actionName };
}

private buildSelectActionTool(schema: CollectionSchema): DynamicStructuredTool {
Expand All @@ -235,15 +248,16 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<
});
}

private resolveActionName(schema: CollectionSchema, displayName: string): string {
const action =
schema.actions.find(a => a.displayName === displayName) ??
schema.actions.find(a => a.name === displayName);

if (!action) {
throw new ActionNotFoundError(displayName, schema.collectionName);
}
private findActionByTechnicalName(
schema: CollectionSchema,
name: string,
): ActionSchema | undefined {
return schema.actions.find(a => a.name === name);
}

return action.name;
private findAction(schema: CollectionSchema, name: string): ActionSchema | undefined {
return (
schema.actions.find(a => a.displayName === name) ?? schema.actions.find(a => a.name === name)
);
}
}
Loading
Loading