Skip to content

实现自动化引擎插件化架构,支持能力无限扩展 #741

@hotlong

Description

@hotlong

目标

设计并实现一个基础的自动化引擎,允许通过插件持续扩展节点能力。

方案概述

  1. 基于 MicroKernel 插件架构:引擎作为一个 Kernel 插件,只负责 DAG 解析和节点分发。各类自动化能力(节点类型/触发器/连接器/AI/审批等)全部通过独立插件扩展,按需加载。
  2. NodeExecutor 扩展点:每种节点类型都是一个实现 NodeExecutor 的插件,注册到核心引擎后即生效。
  3. Service Registry 提供依赖注入和插件发现:插件通过 PluginContext.registerService/getService方式互相调用。引擎暴露为 'automation' 服务。
  4. Hook/Event 总线:流程执行前后、节点执行时均可通过 hook 拦截与观察。第三方插件可注入自定义处理逻辑。
  5. 依赖拓扑排序保证初始化顺序:插件声明 dependencies,保证能力插件在引擎启动后注册。

技术任务拆分

MVP

  • 基础引擎实现(AutomationEngine 类)
    • 支持 FlowSchema/DAG 流程执行
    • NodeExecutor & Trigger 接口(插件扩展点)
    • Plugin 封装,暴露为 'automation' service
  • CRUD 节点插件(get/create/update/delete_record)
  • 核心逻辑节点插件(decision/assignment/loop)
  • HTTP/Connector 节点插件(http_request/connector_action)
  • 插件化注册与热插拔能力

二期

  • Script/Await/AI/Approval/Schedule/Event 节点插件
  • Studio 可视化能力插件

示例代码

// 插件注册API示例
kernel.use(createAutomationPlugin()); // 核心引擎
kernel.use(createCrudNodesPlugin());  // CRUD能力
kernel.use(createLogicNodesPlugin()); // 条件/赋值/循环
kernel.use(createHttpConnectorPlugin()); // HTTP/集成
kernel.use(createAINodePlugin()); // AI节点

验收标准

  • 任意新节点类型或自动化能力均可通过新插件独立扩展
  • 插件注册后无需重启即可新增/热插拔功能
  • 扩展流程符合对象协议与ROADMAP规划

备注:开发任务完成后需补充 test,并及时更新 roadmap。


🏗️ 整体架构:基于 ObjectStack MicroKernel 的插件化 Automation Engine

ObjectStack 已经有一套成熟的 MicroKernel + Plugin 架构,自动化引擎应该 完全复用 这套体系,而不是另起炉灶。

核心思路

┌─────────────────────────────────────────────────────────┐
│                    ObjectKernel (LiteKernel)              │
│  ┌─────────────────────────────────────────────────────┐ │
│  │              Service Registry (DI Container)         │ │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────��───────────┐ │ │
│  │  │   db     │ │  http    │ │  automation (核心引擎) │ │ │
│  │  └──────────┘ └──────────┘ └──────────────────────┘ │ │
│  └─────────────────────────────────────────────────────┘ │
│  ┌─────────────────────────────────────────────────────┐ │
│  │              Hook / Event Bus                        │ │
│  │   automation:beforeExecute  automation:afterExecute   │ │
│  │   automation:nodeExecute    automation:flowRegistered  │ │
│  └─────────────────────────────────────────────────────┘ │
│                                                           │
│  Plugins:                                                 │
│  ┌─────────────┐ ┌──────────────┐ ┌───────────────────┐ │
│  │  core-nodes  │ │ connector-   │ │ ai-node-plugin    │ │
│  │  plugin      │ │ plugin       │ │                   │ │
│  └─────────────┘ └──────────────┘ └───────────────────┘ │
│  ┌─────────────┐ ┌──────────────┐ ┌───────────────────┐ │
│  │ schedule-   │ │ approval-    │ │ custom-script-    │ │
│  │ trigger     │ │ plugin       │ │ plugin            │ │
│  └─────────────┘ └──────────────┘ └───────────────────┘ │
└─────────────────────────────────────────────────────────┘

第一步:创建核心自动化引擎插件

这是唯一的「硬核」部分——它只做 DAG 解析 + 节点分发 + 变量流转,不实现任何具体节点逻辑。

// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import type { FlowParsed, FlowNodeParsed, FlowEdgeParsed } from '@objectstack/spec/automation';
import type { AutomationContext, AutomationResult, IAutomationService } from '@objectstack/spec/contracts';
import type { Logger } from '@objectstack/spec/contracts';

// ─── Node Executor Interface (插件扩展点) ───────────────────────────

/**
 * 每种节点类型对应一个 NodeExecutor。
 * 第三方插件只需实现这个接口,注册到引擎即可扩展自动化能力。
 */
export interface NodeExecutor {
    /** 对应 FlowNodeAction 枚举值 */
    readonly type: string;

    /**
     * 执行节点
     * @param node - 当前节点定义
     * @param variables - 流程变量上下文(可读写)
     * @param context - 触发上下文
     * @returns 执行结果(可包含输出数据、分支条件等)
     */
    execute(
        node: FlowNodeParsed,
        variables: Map<string, unknown>,
        context: AutomationContext,
    ): Promise<NodeExecutionResult>;
}

export interface NodeExecutionResult {
    success: boolean;
    output?: Record<string, unknown>;
    error?: string;
    /** 用于 decision 节点,返回选中的分支标签 */
    branchLabel?: string;
}

// ─── Trigger Interface (插件扩展点) ─────────────────────────────────

/**
 * 触发器接口。Schedule/Event/API 等触发器通过插件注册。
 */
export interface FlowTrigger {
    readonly type: string;
    start(flowName: string, callback: (ctx: AutomationContext) => Promise<void>): void;
    stop(flowName: string): void;
}

// ─── Core Automation Engine ─────────────────────────────────────────

export class AutomationEngine implements IAutomationService {
    private flows = new Map<string, FlowParsed>();
    private nodeExecutors = new Map<string, NodeExecutor>();
    private triggers = new Map<string, FlowTrigger>();
    private logger: Logger;

    constructor(logger: Logger) {
        this.logger = logger;
    }

    // ── 插件扩展 API ──────────────────────────────────

    /** 注册节点执行器(由插件调用) */
    registerNodeExecutor(executor: NodeExecutor): void {
        if (this.nodeExecutors.has(executor.type)) {
            this.logger.warn(`Node executor '${executor.type}' replaced`);
        }
        this.nodeExecutors.set(executor.type, executor);
        this.logger.info(`Node executor registered: ${executor.type}`);
    }

    /** 注册触发器(由插件调用) */
    registerTrigger(trigger: FlowTrigger): void {
        this.triggers.set(trigger.type, trigger);
        this.logger.info(`Trigger registered: ${trigger.type}`);
    }

    /** 获取已注册的所有节点类型 */
    getRegisteredNodeTypes(): string[] {
        return [...this.nodeExecutors.keys()];
    }

    // ── IAutomationService 合约实现 ────────────────────

    registerFlow(name: string, definition: unknown): void {
        // 用 FlowSchema 校验
        const { FlowSchema } = require('@objectstack/spec/automation');
        const parsed = FlowSchema.parse(definition);
        this.flows.set(name, parsed);
        this.logger.info(`Flow registered: ${name}`);
    }

    unregisterFlow(name: string): void {
        this.flows.delete(name);
    }

    async listFlows(): Promise<string[]> {
        return [...this.flows.keys()];
    }

    async execute(flowName: string, context?: AutomationContext): Promise<AutomationResult> {
        const startTime = Date.now();
        const flow = this.flows.get(flowName);

        if (!flow) {
            return { success: false, error: `Flow '${flowName}' not found` };
        }

        // 初始化变量上下文
        const variables = new Map<string, unknown>();
        if (flow.variables) {
            for (const v of flow.variables) {
                if (v.isInput && context?.params?.[v.name] !== undefined) {
                    variables.set(v.name, context.params[v.name]);
                }
            }
        }
        // 注入触发记录
        if (context?.record) {
            variables.set('$record', context.record);
        }

        try {
            // 找到 start 节点
            const startNode = flow.nodes.find(n => n.type === 'start');
            if (!startNode) {
                return { success: false, error: 'Flow has no start node' };
            }

            // DAG 遍历执行
            await this.executeNode(startNode, flow, variables, context ?? {});

            // 收集输出变量
            const output: Record<string, unknown> = {};
            if (flow.variables) {
                for (const v of flow.variables) {
                    if (v.isOutput) {
                        output[v.name] = variables.get(v.name);
                    }
                }
            }

            return {
                success: true,
                output,
                durationMs: Date.now() - startTime,
            };
        } catch (err: any) {
            // 错误处理策略
            if (flow.errorHandling?.strategy === 'retry') {
                return this.retryExecution(flow, flowName, context, startTime);
            }
            return {
                success: false,
                error: err.message,
                durationMs: Date.now() - startTime,
            };
        }
    }

    // ── DAG 遍历核心 ──────────────────────────────────

    private async executeNode(
        node: FlowNodeParsed,
        flow: FlowParsed,
        variables: Map<string, unknown>,
        context: AutomationContext,
    ): Promise<void> {
        if (node.type === 'end') return;

        // 查找执行器
        const executor = this.nodeExecutors.get(node.type);
        if (!executor) {
            // start 节点没有执行器也没关系,直接跳过
            if (node.type !== 'start') {
                throw new Error(`No executor registered for node type '${node.type}'`);
            }
        } else {
            // 执行节点
            const result = await executor.execute(node, variables, context);
            if (!result.success) {
                throw new Error(`Node '${node.id}' failed: ${result.error}`);
            }
            // 写回输出变量
            if (result.output) {
                for (const [key, value] of Object.entries(result.output)) {
                    variables.set(`${node.id}.${key}`, value);
                }
            }
        }

        // 查找下一个节点(按边条件过滤)
        const outEdges = flow.edges.filter(e => e.source === node.id);
        for (const edge of outEdges) {
            if (edge.condition && !this.evaluateCondition(edge.condition, variables)) {
                continue;
            }
            const nextNode = flow.nodes.find(n => n.id === edge.target);
            if (nextNode) {
                await this.executeNode(nextNode, flow, variables, context);
            }
        }
    }

    private evaluateCondition(expression: string, variables: Map<string, unknown>): boolean {
        // 简单的模板替换 + 表达式求值(MVP 版本)
        let resolved = expression;
        for (const [key, value] of variables) {
            resolved = resolved.replace(new RegExp(`\\{${key}\\}`, 'g'), String(value));
        }
        try {
            return new Function(`return (${resolved})`)() as boolean;
        } catch {
            return false;
        }
    }

    private async retryExecution(
        flow: FlowParsed,
        flowName: string,
        context: AutomationContext | undefined,
        startTime: number,
    ): Promise<AutomationResult> {
        const maxRetries = flow.errorHandling?.maxRetries ?? 3;
        const delay = flow.errorHandling?.retryDelayMs ?? 1000;

        for (let i = 0; i < maxRetries; i++) {
            await new Promise(r => setTimeout(r, delay));
            const result = await this.execute(flowName, context);
            if (result.success) return result;
        }
        return { success: false, error: 'Max retries exceeded', durationMs: Date.now() - startTime };
    }
}

第二步:将引擎封装为 Kernel Plugin

严格遵循 ObjectStack 的 Plugin 接口(initstartdestroy 三阶段生命周期):

// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import type { Plugin, PluginContext } from '@objectstack/core';
import { AutomationEngine } from './engine';

/**
 * Automation Plugin — 核心引擎插件
 * 
 * 职责:
 * 1. init 阶段:创建引擎实例,注册为 'automation' service
 * 2. start 阶段:激活所有已注册的 Trigger
 * 3. destroy 阶段:停止所有 Trigger,清理资源
 * 
 * 不实现任何具体节点 — 节点由其他插件通过引擎扩展 API 注册
 */
export function createAutomationPlugin(): Plugin {
    let engine: AutomationEngine;

    return {
        name: 'com.objectstack.service-automation',
        version: '1.0.0',
        type: 'standard',
        // 无依赖 — 其他插件依赖我
        dependencies: [],

        async init(ctx: PluginContext) {
            engine = new AutomationEngine(ctx.logger);

            // 注册为全局 service,其他插件通过 ctx.getService('automation') 获取
            ctx.registerService('automation', engine);

            // 注册 hook,允许其他插件在流程执行前后拦截
            ctx.hook('automation:beforeExecute', async (flowName: string, context: any) => {
                ctx.logger.debug(`[Automation] Before execute: ${flowName}`);
            });

            ctx.logger.info('[Automation] Engine initialized');
        },

        async start(ctx: PluginContext) {
            // 触发 hook 通知引擎就绪,其他插件可以开始注册节点
            await ctx.trigger('automation:ready', engine);
            ctx.logger.info(`[Automation] Engine started with node types: ${engine.getRegisteredNodeTypes().join(', ')}`);
        },

        async destroy() {
            // 清理资源
        },
    };
}

第三步:通过插件扩展节点能力

这是架构的核心亮点 — 每种能力都是一个独立插件。以下展示几个典型的扩展插件:

3.1 核心 CRUD 节点插件

import type { Plugin, PluginContext } from '@objectstack/core';
import type { AutomationEngine, NodeExecutor } from '../engine';

/**
 * CRUD Node Plugin — 提供 get_record / create_record / update_record / delete_record
 * 
 * 依赖: service-automation (引擎), objectql (数据层)
 */
export function createCrudNodesPlugin(): Plugin {
    return {
        name: 'com.objectstack.automation.crud-nodes',
        version: '1.0.0',
        dependencies: ['com.objectstack.service-automation'],

        async init(ctx: PluginContext) {
            const engine = ctx.getService<AutomationEngine>('automation');

            // 注册 get_record 节点执行器
            engine.registerNodeExecutor({
                type: 'get_record',
                async execute(node, variables, context) {
                    const config = node.config as any;
                    // 通过 ObjectQL 查询记录
                    // const ql = ctx.getService('objectql');
                    // const records = await ql.find(config.object, config.filters);
                    return {
                        success: true,
                        output: { records: [] }, // 实际从 ObjectQL 查询
                    };
                },
            });

            // 注册 create_record
            engine.registerNodeExecutor({
                type: 'create_record',
                async execute(node, variables) {
                    const config = node.config as any;
                    return { success: true, output: { id: 'new-record-id' } };
                },
            });

            // 注册 update_record / delete_record ... 同理
            engine.registerNodeExecutor({
                type: 'update_record',
                async execute(node, variables) {
                    return { success: true };
                },
            });

            engine.registerNodeExecutor({
                type: 'delete_record',
                async execute(node, variables) {
                    return { success: true };
                },
            });

            ctx.logger.info('[CRUD Nodes] 4 node executors registered');
        },
    };
}

3.2 Decision + Assignment 逻辑节点插件

import type { Plugin, PluginContext } from '@objectstack/core';
import type { AutomationEngine } from '../engine';

export function createLogicNodesPlugin(): Plugin {
    return {
        name: 'com.objectstack.automation.logic-nodes',
        version: '1.0.0',
        dependencies: ['com.objectstack.service-automation'],

        async init(ctx: PluginContext) {
            const engine = ctx.getService<AutomationEngine>('automation');

            // decision 节点 — 条件分支
            engine.registerNodeExecutor({
                type: 'decision',
                async execute(node, variables) {
                    const conditions = (node.config as any)?.conditions ?? [];
                    for (const cond of conditions) {
                        // 简化的条件求值
                        let expr = cond.expression as string;
                        for (const [k, v] of variables) {
                            expr = expr.replace(new RegExp(`\\{${k}\\}`, 'g'), String(v));
                        }
                        try {
                            if (new Function(`return (${expr})`)()) {
                                return { success: true, branchLabel: cond.label };
                            }
                        } catch { /* continue */ }
                    }
                    return { success: true, branchLabel: 'default' };
                },
            });

            // assignment 节点 — 设置变量
            engine.registerNodeExecutor({
                type: 'assignment',
                async execute(node, variables) {
                    const assignments = (node.config as any) ?? {};
                    for (const [key, value] of Object.entries(assignments)) {
                        variables.set(key, value);
                    }
                    return { success: true };
                },
            });

            // loop 节点
            engine.registerNodeExecutor({
                type: 'loop',
                async execute(node, variables) {
                    const collection = variables.get((node.config as any)?.collection as string);
                    if (Array.isArray(collection)) {
                        variables.set('$loopItems', collection);
                        variables.set('$loopIndex', 0);
                    }
                    return { success: true };
                },
            });
        },
    };
}

3.3 HTTP + Connector 集成插件

import type { Plugin, PluginContext } from '@objectstack/core';
import type { AutomationEngine } from '../engine';

export function createHttpConnectorPlugin(): Plugin {
    return {
        name: 'com.objectstack.automation.http-connector',
        version: '1.0.0',
        dependencies: ['com.objectstack.service-automation'],

        async init(ctx: PluginContext) {
            const engine = ctx.getService<AutomationEngine>('automation');

            // http_request 节点
            engine.registerNodeExecutor({
                type: 'http_request',
                async execute(node, variables) {
                    const config = node.config as any;
                    const response = await fetch(config.url, {
                        method: config.method ?? 'GET',
                        headers: config.headers,
                        body: config.body ? JSON.stringify(config.body) : undefined,
                    });
                    const data = await response.json();
                    return { success: response.ok, output: { response: data } };
                },
            });

            // connector_action 节点 — 调用已注册的 Connector
            engine.registerNodeExecutor({
                type: 'connector_action',
                async execute(node, variables) {
                    const { connectorId, actionId, input } = node.connectorConfig ?? {} as any;
                    // 实际实现会从 connector registry 获取 connector 并执行
                    ctx.logger.info(`Connector action: ${connectorId}.${actionId}`);
                    return { success: true, output: { connectorResult: {} } };
                },
            });
        },
    };
}

3.4 AI 任务节点插件(未来扩展示例)

import type { Plugin, PluginContext } from '@objectstack/core';
import type { AutomationEngine } from '../engine';
import type { IAIService } from '@objectstack/spec/contracts';

/**
 * AI Node Plugin — 在自动化流程中调用 LLM / Agent
 * 这就是通过插件不断扩展自动化能力的典型示例
 */
export function createAINodePlugin(): Plugin {
    return {
        name: 'com.objectstack.automation.ai-node',
        version: '1.0.0',
        dependencies: [
            'com.objectstack.service-automation',
            'com.objectstack.service-ai',  // 依赖 AI 服务
        ],

        async init(ctx: PluginContext) {
            const engine = ctx.getService<AutomationEngine>('automation');
            const ai = ctx.getService<IAIService>('ai');

            engine.registerNodeExecutor({
                type: 'ai_task',
                async execute(node, variables) {
                    const config = node.config as any;
                    const result = await ai.complete(config.prompt, {
                        model: config.model,
                        temperature: config.temperature ?? 0.7,
                    });
                    return {
                        success: true,
                        output: { aiResponse: result.content },
                    };
                },
            });
        },
    };
}

第四步:组装 — 在 Kernel 中注册所有插件

这就是 ObjectStack 的 LiteKernel 用法,与现有代码完全一致:

import { LiteKernel } from '@objectstack/core';
import { createAutomationPlugin } from './plugin';
import { createCrudNodesPlugin } from './plugins/crud-nodes-plugin';
import { createLogicNodesPlugin } from './plugins/logic-nodes-plugin';
import { createHttpConnectorPlugin } from './plugins/http-connector-plugin';
import { createAINodePlugin } from './plugins/ai-node-plugin';

const kernel = new LiteKernel();

// 1️⃣ 核心引擎(必须第一个)
kernel.use(createAutomationPlugin());

// 2️⃣ 内置节点能力(按需加载)
kernel.use(createCrudNodesPlugin());
kernel.use(createLogicNodesPlugin());
kernel.use(createHttpConnectorPlugin());

// 3️⃣ 扩展能力(可选,按需添加)
// kernel.use(createAINodePlugin());         // AI 节点
// kernel.use(createScheduleTriggerPlugin()); // 定时触发
// kernel.use(createApprovalNodePlugin());    // 审批节点
// kernel.use(createScriptNodePlugin());      // 脚本节点

// 🚀 启动
await kernel.bootstrap();

// 执行流程
const automation = kernel.getService<IAutomationService>('automation');
const result = await automation.execute('my_approval_flow', {
    record: { id: 'rec-1', amount: 50000 },
    object: 'opportunity',
    event: 'on_create',
});

第五步:扩展能力一览(插件清单)

基于上述架构,添加新的自动化能力只需要

  1. 实现 NodeExecutor 接口
  2. 封装为 Plugin
  3. kernel.use(yourPlugin) 注册
插件名 注册的节点类型 复杂度 阶段
crud-nodes-plugin get/create/update/delete_record ⭐⭐ MVP
logic-nodes-plugin decision/assignment/loop ⭐⭐ MVP
http-connector-plugin http_request/connector_action ⭐⭐ MVP
script-node-plugin script (JS/TS sandbox) ⭐⭐⭐ MVP
subflow-plugin subflow (嵌套流程调用) ⭐⭐⭐ Phase 2
screen-node-plugin screen (人机交互) ⭐⭐⭐⭐ Phase 2
wait-node-plugin wait (延迟/等待事件) ⭐⭐ Phase 2
ai-node-plugin ai_task/agent_call ⭐⭐⭐ Phase 3
approval-plugin approval (多级审批) ⭐⭐⭐⭐ Phase 3
schedule-trigger-plugin FlowTrigger: schedule ⭐⭐ Phase 2
event-trigger-plugin FlowTrigger: record_change ⭐⭐⭐ Phase 2

为什么这个架构能 work?

关键在于 ObjectStack 已有的三大基础设施完美匹配:

  1. PluginContext.registerService() — 引擎通过 service 暴露给所有插件

    registerService(name: string, service: any): void;
    getService<T>(name: string): T;
  2. PluginContext.hook() / trigger() — 流程执行前后的拦截点

    hook(name: string, handler: (...args: any[]) => void | Promise<void>): void;
    trigger(name: string, ...args: any[]): Promise<void>;
  3. Plugin.dependencies[] — 拓扑排序保证引擎先于节点插件初始化

    dependencies?: string[];

这样,引擎本身永远保持精简(~200 行),所有节点能力通过插件 按需加载、热插拔。需要新能力?写一个 Plugin,注册一个 NodeExecutor,done。

Metadata

Metadata

Labels

enhancementNew feature or request

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions