-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
enhancementNew feature or requestNew feature or request
Description
目标
设计并实现一个基础的自动化引擎,允许通过插件持续扩展节点能力。
方案概述
- 基于 MicroKernel 插件架构:引擎作为一个 Kernel 插件,只负责 DAG 解析和节点分发。各类自动化能力(节点类型/触发器/连接器/AI/审批等)全部通过独立插件扩展,按需加载。
- NodeExecutor 扩展点:每种节点类型都是一个实现 NodeExecutor 的插件,注册到核心引擎后即生效。
- Service Registry 提供依赖注入和插件发现:插件通过 PluginContext.registerService/getService方式互相调用。引擎暴露为 'automation' 服务。
- Hook/Event 总线:流程执行前后、节点执行时均可通过 hook 拦截与观察。第三方插件可注入自定义处理逻辑。
- 依赖拓扑排序保证初始化顺序:插件声明 dependencies,保证能力插件在引擎启动后注册。
技术任务拆分
MVP
- 基础引擎实现(AutomationEngine 类)
- 支持 FlowSchema/DAG 流程执行
- NodeExecutor & Trigger 接口(插件扩展点)
- Plugin 封装,暴露为 'automation' service
- CRUD 节点插件(get/create/update/delete_record)
- 核心逻辑节点插件(decision/assignment/loop)
- HTTP/Connector 节点插件(http_request/connector_action)
- 插件化注册与热插拔能力
二期
- Script/Await/AI/Approval/Schedule/Event 节点插件
- Studio 可视化能力插件
示例代码
// 插件注册API示例
kernel.use(createAutomationPlugin()); // 核心引擎
kernel.use(createCrudNodesPlugin()); // CRUD能力
kernel.use(createLogicNodesPlugin()); // 条件/赋值/循环
kernel.use(createHttpConnectorPlugin()); // HTTP/集成
kernel.use(createAINodePlugin()); // AI节点验收标准
- 任意新节点类型或自动化能力均可通过新插件独立扩展
- 插件注册后无需重启即可新增/热插拔功能
- 扩展流程符合对象协议与ROADMAP规划
备注:开发任务完成后需补充 test,并及时更新 roadmap。
🏗️ 整体架构:基于 ObjectStack MicroKernel 的插件化 Automation Engine
ObjectStack 已经有一套成熟的 MicroKernel + Plugin 架构,自动化引擎应该 完全复用 这套体系,而不是另起炉灶。
核心思路
┌─────────────────────────────────────────────────────────┐
│ ObjectKernel (LiteKernel) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Service Registry (DI Container) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────��───────────┐ │ │
│ │ │ db │ │ http │ │ automation (核心引擎) │ │ │
│ │ └──────────┘ └──────────┘ └──────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Hook / Event Bus │ │
│ │ automation:beforeExecute automation:afterExecute │ │
│ │ automation:nodeExecute automation:flowRegistered │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Plugins: │
│ ┌─────────────┐ ┌──────────────┐ ┌───────────────────┐ │
│ │ core-nodes │ │ connector- │ │ ai-node-plugin │ │
│ │ plugin │ │ plugin │ │ │ │
│ └─────────────┘ └──────────────┘ └───────────────────┘ │
│ ┌─────────────┐ ┌──────────────┐ ┌───────────────────┐ │
│ │ schedule- │ │ approval- │ │ custom-script- │ │
│ │ trigger │ │ plugin │ │ plugin │ │
│ └─────────────┘ └──────────────┘ └───────────────────┘ │
└─────────────────────────────────────────────────────────┘
第一步:创建核心自动化引擎插件
这是唯一的「硬核」部分——它只做 DAG 解析 + 节点分发 + 变量流转,不实现任何具体节点逻辑。
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.
import type { FlowParsed, FlowNodeParsed, FlowEdgeParsed } from '@objectstack/spec/automation';
import type { AutomationContext, AutomationResult, IAutomationService } from '@objectstack/spec/contracts';
import type { Logger } from '@objectstack/spec/contracts';
// ─── Node Executor Interface (插件扩展点) ───────────────────────────
/**
* 每种节点类型对应一个 NodeExecutor。
* 第三方插件只需实现这个接口,注册到引擎即可扩展自动化能力。
*/
export interface NodeExecutor {
/** 对应 FlowNodeAction 枚举值 */
readonly type: string;
/**
* 执行节点
* @param node - 当前节点定义
* @param variables - 流程变量上下文(可读写)
* @param context - 触发上下文
* @returns 执行结果(可包含输出数据、分支条件等)
*/
execute(
node: FlowNodeParsed,
variables: Map<string, unknown>,
context: AutomationContext,
): Promise<NodeExecutionResult>;
}
export interface NodeExecutionResult {
success: boolean;
output?: Record<string, unknown>;
error?: string;
/** 用于 decision 节点,返回选中的分支标签 */
branchLabel?: string;
}
// ─── Trigger Interface (插件扩展点) ─────────────────────────────────
/**
* 触发器接口。Schedule/Event/API 等触发器通过插件注册。
*/
export interface FlowTrigger {
readonly type: string;
start(flowName: string, callback: (ctx: AutomationContext) => Promise<void>): void;
stop(flowName: string): void;
}
// ─── Core Automation Engine ─────────────────────────────────────────
export class AutomationEngine implements IAutomationService {
private flows = new Map<string, FlowParsed>();
private nodeExecutors = new Map<string, NodeExecutor>();
private triggers = new Map<string, FlowTrigger>();
private logger: Logger;
constructor(logger: Logger) {
this.logger = logger;
}
// ── 插件扩展 API ──────────────────────────────────
/** 注册节点执行器(由插件调用) */
registerNodeExecutor(executor: NodeExecutor): void {
if (this.nodeExecutors.has(executor.type)) {
this.logger.warn(`Node executor '${executor.type}' replaced`);
}
this.nodeExecutors.set(executor.type, executor);
this.logger.info(`Node executor registered: ${executor.type}`);
}
/** 注册触发器(由插件调用) */
registerTrigger(trigger: FlowTrigger): void {
this.triggers.set(trigger.type, trigger);
this.logger.info(`Trigger registered: ${trigger.type}`);
}
/** 获取已注册的所有节点类型 */
getRegisteredNodeTypes(): string[] {
return [...this.nodeExecutors.keys()];
}
// ── IAutomationService 合约实现 ────────────────────
registerFlow(name: string, definition: unknown): void {
// 用 FlowSchema 校验
const { FlowSchema } = require('@objectstack/spec/automation');
const parsed = FlowSchema.parse(definition);
this.flows.set(name, parsed);
this.logger.info(`Flow registered: ${name}`);
}
unregisterFlow(name: string): void {
this.flows.delete(name);
}
async listFlows(): Promise<string[]> {
return [...this.flows.keys()];
}
async execute(flowName: string, context?: AutomationContext): Promise<AutomationResult> {
const startTime = Date.now();
const flow = this.flows.get(flowName);
if (!flow) {
return { success: false, error: `Flow '${flowName}' not found` };
}
// 初始化变量上下文
const variables = new Map<string, unknown>();
if (flow.variables) {
for (const v of flow.variables) {
if (v.isInput && context?.params?.[v.name] !== undefined) {
variables.set(v.name, context.params[v.name]);
}
}
}
// 注入触发记录
if (context?.record) {
variables.set('$record', context.record);
}
try {
// 找到 start 节点
const startNode = flow.nodes.find(n => n.type === 'start');
if (!startNode) {
return { success: false, error: 'Flow has no start node' };
}
// DAG 遍历执行
await this.executeNode(startNode, flow, variables, context ?? {});
// 收集输出变量
const output: Record<string, unknown> = {};
if (flow.variables) {
for (const v of flow.variables) {
if (v.isOutput) {
output[v.name] = variables.get(v.name);
}
}
}
return {
success: true,
output,
durationMs: Date.now() - startTime,
};
} catch (err: any) {
// 错误处理策略
if (flow.errorHandling?.strategy === 'retry') {
return this.retryExecution(flow, flowName, context, startTime);
}
return {
success: false,
error: err.message,
durationMs: Date.now() - startTime,
};
}
}
// ── DAG 遍历核心 ──────────────────────────────────
private async executeNode(
node: FlowNodeParsed,
flow: FlowParsed,
variables: Map<string, unknown>,
context: AutomationContext,
): Promise<void> {
if (node.type === 'end') return;
// 查找执行器
const executor = this.nodeExecutors.get(node.type);
if (!executor) {
// start 节点没有执行器也没关系,直接跳过
if (node.type !== 'start') {
throw new Error(`No executor registered for node type '${node.type}'`);
}
} else {
// 执行节点
const result = await executor.execute(node, variables, context);
if (!result.success) {
throw new Error(`Node '${node.id}' failed: ${result.error}`);
}
// 写回输出变量
if (result.output) {
for (const [key, value] of Object.entries(result.output)) {
variables.set(`${node.id}.${key}`, value);
}
}
}
// 查找下一个节点(按边条件过滤)
const outEdges = flow.edges.filter(e => e.source === node.id);
for (const edge of outEdges) {
if (edge.condition && !this.evaluateCondition(edge.condition, variables)) {
continue;
}
const nextNode = flow.nodes.find(n => n.id === edge.target);
if (nextNode) {
await this.executeNode(nextNode, flow, variables, context);
}
}
}
private evaluateCondition(expression: string, variables: Map<string, unknown>): boolean {
// 简单的模板替换 + 表达式求值(MVP 版本)
let resolved = expression;
for (const [key, value] of variables) {
resolved = resolved.replace(new RegExp(`\\{${key}\\}`, 'g'), String(value));
}
try {
return new Function(`return (${resolved})`)() as boolean;
} catch {
return false;
}
}
private async retryExecution(
flow: FlowParsed,
flowName: string,
context: AutomationContext | undefined,
startTime: number,
): Promise<AutomationResult> {
const maxRetries = flow.errorHandling?.maxRetries ?? 3;
const delay = flow.errorHandling?.retryDelayMs ?? 1000;
for (let i = 0; i < maxRetries; i++) {
await new Promise(r => setTimeout(r, delay));
const result = await this.execute(flowName, context);
if (result.success) return result;
}
return { success: false, error: 'Max retries exceeded', durationMs: Date.now() - startTime };
}
}第二步:将引擎封装为 Kernel Plugin
严格遵循 ObjectStack 的 Plugin 接口(init → start → destroy 三阶段生命周期):
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.
import type { Plugin, PluginContext } from '@objectstack/core';
import { AutomationEngine } from './engine';
/**
* Automation Plugin — 核心引擎插件
*
* 职责:
* 1. init 阶段:创建引擎实例,注册为 'automation' service
* 2. start 阶段:激活所有已注册的 Trigger
* 3. destroy 阶段:停止所有 Trigger,清理资源
*
* 不实现任何具体节点 — 节点由其他插件通过引擎扩展 API 注册
*/
export function createAutomationPlugin(): Plugin {
let engine: AutomationEngine;
return {
name: 'com.objectstack.service-automation',
version: '1.0.0',
type: 'standard',
// 无依赖 — 其他插件依赖我
dependencies: [],
async init(ctx: PluginContext) {
engine = new AutomationEngine(ctx.logger);
// 注册为全局 service,其他插件通过 ctx.getService('automation') 获取
ctx.registerService('automation', engine);
// 注册 hook,允许其他插件在流程执行前后拦截
ctx.hook('automation:beforeExecute', async (flowName: string, context: any) => {
ctx.logger.debug(`[Automation] Before execute: ${flowName}`);
});
ctx.logger.info('[Automation] Engine initialized');
},
async start(ctx: PluginContext) {
// 触发 hook 通知引擎就绪,其他插件可以开始注册节点
await ctx.trigger('automation:ready', engine);
ctx.logger.info(`[Automation] Engine started with node types: ${engine.getRegisteredNodeTypes().join(', ')}`);
},
async destroy() {
// 清理资源
},
};
}第三步:通过插件扩展节点能力
这是架构的核心亮点 — 每种能力都是一个独立插件。以下展示几个典型的扩展插件:
3.1 核心 CRUD 节点插件
import type { Plugin, PluginContext } from '@objectstack/core';
import type { AutomationEngine, NodeExecutor } from '../engine';
/**
* CRUD Node Plugin — 提供 get_record / create_record / update_record / delete_record
*
* 依赖: service-automation (引擎), objectql (数据层)
*/
export function createCrudNodesPlugin(): Plugin {
return {
name: 'com.objectstack.automation.crud-nodes',
version: '1.0.0',
dependencies: ['com.objectstack.service-automation'],
async init(ctx: PluginContext) {
const engine = ctx.getService<AutomationEngine>('automation');
// 注册 get_record 节点执行器
engine.registerNodeExecutor({
type: 'get_record',
async execute(node, variables, context) {
const config = node.config as any;
// 通过 ObjectQL 查询记录
// const ql = ctx.getService('objectql');
// const records = await ql.find(config.object, config.filters);
return {
success: true,
output: { records: [] }, // 实际从 ObjectQL 查询
};
},
});
// 注册 create_record
engine.registerNodeExecutor({
type: 'create_record',
async execute(node, variables) {
const config = node.config as any;
return { success: true, output: { id: 'new-record-id' } };
},
});
// 注册 update_record / delete_record ... 同理
engine.registerNodeExecutor({
type: 'update_record',
async execute(node, variables) {
return { success: true };
},
});
engine.registerNodeExecutor({
type: 'delete_record',
async execute(node, variables) {
return { success: true };
},
});
ctx.logger.info('[CRUD Nodes] 4 node executors registered');
},
};
}3.2 Decision + Assignment 逻辑节点插件
import type { Plugin, PluginContext } from '@objectstack/core';
import type { AutomationEngine } from '../engine';
export function createLogicNodesPlugin(): Plugin {
return {
name: 'com.objectstack.automation.logic-nodes',
version: '1.0.0',
dependencies: ['com.objectstack.service-automation'],
async init(ctx: PluginContext) {
const engine = ctx.getService<AutomationEngine>('automation');
// decision 节点 — 条件分支
engine.registerNodeExecutor({
type: 'decision',
async execute(node, variables) {
const conditions = (node.config as any)?.conditions ?? [];
for (const cond of conditions) {
// 简化的条件求值
let expr = cond.expression as string;
for (const [k, v] of variables) {
expr = expr.replace(new RegExp(`\\{${k}\\}`, 'g'), String(v));
}
try {
if (new Function(`return (${expr})`)()) {
return { success: true, branchLabel: cond.label };
}
} catch { /* continue */ }
}
return { success: true, branchLabel: 'default' };
},
});
// assignment 节点 — 设置变量
engine.registerNodeExecutor({
type: 'assignment',
async execute(node, variables) {
const assignments = (node.config as any) ?? {};
for (const [key, value] of Object.entries(assignments)) {
variables.set(key, value);
}
return { success: true };
},
});
// loop 节点
engine.registerNodeExecutor({
type: 'loop',
async execute(node, variables) {
const collection = variables.get((node.config as any)?.collection as string);
if (Array.isArray(collection)) {
variables.set('$loopItems', collection);
variables.set('$loopIndex', 0);
}
return { success: true };
},
});
},
};
}3.3 HTTP + Connector 集成插件
import type { Plugin, PluginContext } from '@objectstack/core';
import type { AutomationEngine } from '../engine';
export function createHttpConnectorPlugin(): Plugin {
return {
name: 'com.objectstack.automation.http-connector',
version: '1.0.0',
dependencies: ['com.objectstack.service-automation'],
async init(ctx: PluginContext) {
const engine = ctx.getService<AutomationEngine>('automation');
// http_request 节点
engine.registerNodeExecutor({
type: 'http_request',
async execute(node, variables) {
const config = node.config as any;
const response = await fetch(config.url, {
method: config.method ?? 'GET',
headers: config.headers,
body: config.body ? JSON.stringify(config.body) : undefined,
});
const data = await response.json();
return { success: response.ok, output: { response: data } };
},
});
// connector_action 节点 — 调用已注册的 Connector
engine.registerNodeExecutor({
type: 'connector_action',
async execute(node, variables) {
const { connectorId, actionId, input } = node.connectorConfig ?? {} as any;
// 实际实现会从 connector registry 获取 connector 并执行
ctx.logger.info(`Connector action: ${connectorId}.${actionId}`);
return { success: true, output: { connectorResult: {} } };
},
});
},
};
}3.4 AI 任务节点插件(未来扩展示例)
import type { Plugin, PluginContext } from '@objectstack/core';
import type { AutomationEngine } from '../engine';
import type { IAIService } from '@objectstack/spec/contracts';
/**
* AI Node Plugin — 在自动化流程中调用 LLM / Agent
* 这就是通过插件不断扩展自动化能力的典型示例
*/
export function createAINodePlugin(): Plugin {
return {
name: 'com.objectstack.automation.ai-node',
version: '1.0.0',
dependencies: [
'com.objectstack.service-automation',
'com.objectstack.service-ai', // 依赖 AI 服务
],
async init(ctx: PluginContext) {
const engine = ctx.getService<AutomationEngine>('automation');
const ai = ctx.getService<IAIService>('ai');
engine.registerNodeExecutor({
type: 'ai_task',
async execute(node, variables) {
const config = node.config as any;
const result = await ai.complete(config.prompt, {
model: config.model,
temperature: config.temperature ?? 0.7,
});
return {
success: true,
output: { aiResponse: result.content },
};
},
});
},
};
}第四步:组装 — 在 Kernel 中注册所有插件
这就是 ObjectStack 的 LiteKernel 用法,与现有代码完全一致:
import { LiteKernel } from '@objectstack/core';
import { createAutomationPlugin } from './plugin';
import { createCrudNodesPlugin } from './plugins/crud-nodes-plugin';
import { createLogicNodesPlugin } from './plugins/logic-nodes-plugin';
import { createHttpConnectorPlugin } from './plugins/http-connector-plugin';
import { createAINodePlugin } from './plugins/ai-node-plugin';
const kernel = new LiteKernel();
// 1️⃣ 核心引擎(必须第一个)
kernel.use(createAutomationPlugin());
// 2️⃣ 内置节点能力(按需加载)
kernel.use(createCrudNodesPlugin());
kernel.use(createLogicNodesPlugin());
kernel.use(createHttpConnectorPlugin());
// 3️⃣ 扩展能力(可选,按需添加)
// kernel.use(createAINodePlugin()); // AI 节点
// kernel.use(createScheduleTriggerPlugin()); // 定时触发
// kernel.use(createApprovalNodePlugin()); // 审批节点
// kernel.use(createScriptNodePlugin()); // 脚本节点
// 🚀 启动
await kernel.bootstrap();
// 执行流程
const automation = kernel.getService<IAutomationService>('automation');
const result = await automation.execute('my_approval_flow', {
record: { id: 'rec-1', amount: 50000 },
object: 'opportunity',
event: 'on_create',
});第五步:扩展能力一览(插件清单)
基于上述架构,添加新的自动化能力只需要:
- 实现
NodeExecutor接口 - 封装为
Plugin kernel.use(yourPlugin)注册
| 插件名 | 注册的节点类型 | 复杂度 | 阶段 |
|---|---|---|---|
crud-nodes-plugin |
get/create/update/delete_record | ⭐⭐ | MVP |
logic-nodes-plugin |
decision/assignment/loop | ⭐⭐ | MVP |
http-connector-plugin |
http_request/connector_action | ⭐⭐ | MVP |
script-node-plugin |
script (JS/TS sandbox) | ⭐⭐⭐ | MVP |
subflow-plugin |
subflow (嵌套流程调用) | ⭐⭐⭐ | Phase 2 |
screen-node-plugin |
screen (人机交互) | ⭐⭐⭐⭐ | Phase 2 |
wait-node-plugin |
wait (延迟/等待事件) | ⭐⭐ | Phase 2 |
ai-node-plugin |
ai_task/agent_call | ⭐⭐⭐ | Phase 3 |
approval-plugin |
approval (多级审批) | ⭐⭐⭐⭐ | Phase 3 |
schedule-trigger-plugin |
FlowTrigger: schedule | ⭐⭐ | Phase 2 |
event-trigger-plugin |
FlowTrigger: record_change | ⭐⭐⭐ | Phase 2 |
为什么这个架构能 work?
关键在于 ObjectStack 已有的三大基础设施完美匹配:
-
PluginContext.registerService()— 引擎通过 service 暴露给所有插件registerService(name: string, service: any): void; getService<T>(name: string): T;
-
PluginContext.hook() / trigger()— 流程执行前后的拦截点hook(name: string, handler: (...args: any[]) => void | Promise<void>): void; trigger(name: string, ...args: any[]): Promise<void>;
-
Plugin.dependencies[]— 拓扑排序保证引擎先于节点插件初始化dependencies?: string[];
这样,引擎本身永远保持精简(~200 行),所有节点能力通过插件 按需加载、热插拔。需要新能力?写一个 Plugin,注册一个 NodeExecutor,done。
Reactions are currently unavailable
Metadata
Metadata
Labels
enhancementNew feature or requestNew feature or request