Skip to content

feat: 增强版的SubAgent功能#7108

Open
elecvoid243 wants to merge 65 commits intoAstrBotDevs:masterfrom
elecvoid243:master
Open

feat: 增强版的SubAgent功能#7108
elecvoid243 wants to merge 65 commits intoAstrBotDevs:masterfrom
elecvoid243:master

Conversation

@elecvoid243
Copy link
Copy Markdown

@elecvoid243 elecvoid243 commented Mar 28, 2026

Modifications / 改动点


一、概述

增强版SubAgent对AstrBot的Subagent进行了功能扩充,支持两种使用方式:

  1. 静态配置:通过 subagent_orchestrator.agents 在配置文件中预定义子代理(原版实现)
  2. 动态创建:主Agent在运行时通过工具调用动态创建子代理,可以随时回收,也可以每轮对话结束后自动清理

所有子代理(无论静态还是动态)都由统一的 SubAgentManager 管理,支持以下高级功能:独立上下文记忆、独立工具隔离、工作目录隔离、技能隔离、上下文共享区域、超时限制、后台任务模式等。

resolved #6954
resolved #7605

核心特性

特性 说明
统一实现 静态配置和动态创建的子代理都由 SubAgentManager 统一管理
动态创建 主Agent可通过create_subagent在运行时动态创建子代理
更好的后台任务模式 支持将耗时任务放到后台执行,主Agent可并行处理其他任务,也可通过wait_for_subagent阻塞式等待输出
自动清理 一轮对话结束时自动清理未受保护的子代理
上下文记忆 子代理可以保留跨轮对话历史
工作目录隔离 每个子代理可配置独立工作目录,并带有路径安全检查
Skills隔离 子代理可使用Skills,每个子代理拥有独立的技能配置
公共上下文 可为子代理启用公共上下文功能,共享信息和协调工作
行为规范注入 动态子代理自动注入安全模式、输出规范、时间信息等行为约束
数量限制 可配置的动态子代理最大数量限制(静态不受影响)
工具管理 除了主Agent动态分配的工具,用户可配置黑名单:防止子代理获得管理类工具;固有名单:子代理可拥有固有工具保证基础能力
执行超时控制 支持为子代理设置执行超时限制,避免无限等待

二、配置说明

2.1 配置项说明

cmd_config.json 中通过 subagent_orchestrator 配置块进行设置:

  "subagent_orchestrator": {
    "main_enable": true,
    "remove_main_duplicate_tools": false,
    "router_system_prompt": "You are a task router. Your job is to chat naturally, recognize user intent, and delegate work to the most suitable subagent using transfer_to_* tools. Do not try to use domain tools yourself. If no subagent fits, respond directly.",
    "agents": [
      {
        "name": "static_subagent",
        "persona_id": "测试人格",
        "public_description": "测试子代理(静态)",
        "enabled": true,
        "provider_id": "kimi-code/kimi-for-coding"
      }
    ],
    "dynamic_agents": {
      "enabled": true,
      "max_dynamic_subagent_count": 3,
      "auto_cleanup_per_turn": true,
      "tools_blacklist": [
        "send_shared_context_for_main_agent",
        "create_subagent",
        "protect_subagent",
        "unprotect_subagent",
        "reset_subagent",
        "remove_subagent",
        "list_subagents",
        "wait_for_subagent",
        "view_shared_context"
      ],
      "tools_inherent": [
        "astrbot_execute_shell",
        "astrbot_execute_python"
      ]
    },
    "history_enabled": true,
    "shared_context_enabled": true,
    "shared_context_maxlen": 300,
    "subagent_history_maxlen": 300,
    "execution_timeout": 1200
  }

2.2 配置项详解

配置项 类型 默认值 说明
main_enable bool false 总开关,启用SubAgent编排功能(静态+动态)
静态子代理(原版)
remove_main_duplicate_tools bool false 是否从主Agent移除已分配给子代理的工具(静态)
router_system_prompt str "" 主Agent路由提示词(仅启用静态时有效)
agents list [] 静态子代理配置列表
动态子代理(新增)
enabled bool false 是否允许主Agent动态创建子代理
max_dynamic_subagent_count int 3 单个会话允许的最大子代理数量,包含动态和静态
auto_cleanup_per_turn bool true 每轮对话结束后自动清理非保护子代理
tools_blacklist list 预设黑名单列表 动态subagent工具黑名单列表,例如禁止subagent获得管理工具
tools_inherent list 预设固有工具列表 动态subagent固有工具列表
通用配置
history_enabled bool true 是否启用子代理的上下文历史功能,为true可以保留跨多轮对话的记忆
shared_context_enabled bool true 是否启用子代理间公共上下文共享
shared_context_maxlen int 300 公共上下文消息最大数量(条);该项仅控制管理类的变量长度,实际子代理仍受到truncate逻辑控制
max_subagent_history int 300 每个子代理最多保留的历史消息(条);该项仅控制管理类的变量长度,实际子代理仍受到truncate逻辑控制
execution_timeout float 1200.0 SubAgent执行超时时间(秒),-1表示不限制

三、核心文件

3.1 subagent_manager.py

文件路径astrbot/core/subagent_manager.py

核心数据结构

@dataclass
class SubAgentConfig:
    name: str
    system_prompt: str = ""
    tools: set[str] | None = None        # 分配的工具名称集合
    skills: set[str] | None = None       # 分配的技能名称集合
    provider_id: str | None = None       # 可选的LLM提供商ID
    description: str = ""                # handoff工具描述
    workdir: str | None = None           # 子代理工作目录
    execution_timeout: float = 1200.0      # SubAgent 执行超时时间(秒)

@dataclass
class SubAgentExecutionResult:
    task_id: str          # 任务唯一标识符(递增数字字符串)
    agent_name: str
    success: bool
    result: str | None = None
    error: str | None = None
    execution_time: float = 0.0
    created_at: float = 0.0
    completed_at: float = 0.0
    metadata: dict = field(default_factory=dict)

@dataclass
class SubAgentSession:
    session_id: str
    subagents: dict = field(default_factory=dict)       # 存储SubAgentConfig对象
    handoff_tools: dict = field(default_factory=dict)
    subagent_status: dict = field(default_factory=dict) # 工作状态: "IDLE" "RUNNING" "COMPLETED" "FAILED"
    protected_agents: set = field(default_factory=set)  # 受保护agent不会被自动清理
    subagent_histories: dict = field(default_factory=dict) # 每个子代理的历史上下文
    shared_context: list = field(default_factory=list)  # 公共上下文列表
    shared_context_enabled: bool = False                # 是否启用公共上下文
    subagent_background_results: dict = field(default_factory=dict)  # 后台subagent结果存储
    background_task_counters: dict = field(default_factory=dict)  # 任务计数器

核心类SubAgentManager

方法 功能
configure() 配置管理器全局设置(数量限制、自动清理、公共上下文等)
get_execution_timeout() 获取SubAgent执行超时时间
is_auto_cleanup_per_turn() 检查是否启用自动清理
is_shared_context_enabled() 检查是否启用公共上下文
register_blacklisted_tool(tool_name) 注册不应被子Agent使用的工具
register_inherent_tool(tool_name) 注册子Agent默认拥有的工具
create_subagent(session_id, config, protected=False) 创建子代理,返回handoff工具
register_static_subagent(session_id, handoff_tool, skills, workdir) 注册静态子代理到管理器(自动protected=True)
remove_subagent(session_id, agent_name) 移除指定子代理或全部子代理
cleanup_session_turn_end(session_id) 每轮结束时自动清理非保护子代理
protect_subagent(session_id, agent_name) 保护子代理不被自动清理
is_protected(session_id, agent_name) 检查子代理是否受保护
update_subagent_history(session_id, agent_name, current_messages) 更新子代理历史(支持system消息过滤和tool结果截断)
get_subagent_history(session_id, agent_name) 获取子代理历史
clear_subagent_history(session_id, agent_name) 清除子代理历史
build_static_subagent_prompts(session_id, agent_name) 构建不会在会话内变化的子代理提示词(工作目录、行为规范)
build_dynamic_subagent_prompts(session_id, agent_name, runtime) 构建会话内可能变化的子代理提示词(Skills、公共上下文、时间)
build_task_router_prompt(session_id) 构建主Agent的路由提示词(配额信息、创建指南、生命周期等)
get_subagent_tools(session_id, agent_name) 获取子代理被分配的工具列表
add_shared_context(session_id, sender, context_type, content, target) 添加公共上下文消息
get_shared_context(session_id, filter_by_agent) 获取公共上下文(支持按Agent过滤)
_build_shared_context_prompt(session_id, agent_name) 分块构建公共上下文提示(按类型和优先级分组)
create_pending_subagent_task(session_id, agent_name) 为SubAgent创建pending任务,返回task_id
get_pending_subagent_tasks(session_id, agent_name) 获取SubAgent的所有pending任务ID列表
get_latest_task_id(session_id, agent_name) 获取SubAgent的最新任务ID
store_subagent_result(session_id, agent_name, success, result, task_id, error, execution_time, metadata) 存储SubAgent的执行结果
get_subagent_result(session_id, agent_name, task_id) 获取SubAgent的执行结果
has_subagent_result(session_id, agent_name, task_id) 检查SubAgent是否有结果
clear_subagent_result(session_id, agent_name, task_id) 清除SubAgent的执行结果
get_subagent_status(session_id, agent_name) 获取SubAgent的状态:IDLE/RUNNING/COMPLETED/FAILED
get_all_subagent_status(session_id) 获取所有SubAgent的状态
set_subagent_status(session_id, agent_name, status) 设置SubAgent的状态
cleanup_shared_context_by_agent(session_id, agent_name) 清理与指定Agent相关的公共上下文消息
clear_shared_context(session_id) 清除所有公共上下文
set_shared_context_enabled(session_id, enabled) 启用/禁用公共上下文
get_handoff_tools_for_session(session_id) 获取会话的所有handoff工具
cleanup_session(session_id) 清理整个会话

四、功能实现

4.1 静态子代理

功能描述

静态子代理是原版的子代理实现,由配置文件定义人格。

为了实现统一管理,静态子代理在 build_main_agent() 时自动注册到 SubAgentManager,可以享受历史记忆、公共上下文、Skills调用等增强功能(如果开启)。

实现方式

  • 文件astrbot/core/subagent_orchestrator.py
  • 核心方法SubAgentOrchestrator.register_static_subagents_to_manager()
  • 调用时机astr_main_agent.py_apply_subagent_tools()

注册流程

  1. build_main_agent() 构建主Agent时调用 _apply_subagent_tools()
  2. _apply_subagent_tools() 调用 so.register_static_subagents_to_manager(session_id)
  3. 遍历 SubAgentOrchestrator.handoffs 中的所有静态 handoff 工具
  4. 对每个 handoff 调用 SubAgentManager.register_static_subagent()
  5. register_static_subagent() 内部:
    • HandoffTool 提取 agent 信息
    • 创建 SubAgentConfig
    • 调用 create_subagent(session_id, config, protected=True)
  6. 静态子代理注册完成,自动受保护(防止自动清理)

4.2 动态子代理

功能描述

当动态子代理功能开启时,主Agent可以通过create_subagent工具动态创建子代理,每个子代理拥有独立的人设、工具、技能和工作目录配置。随后系统会创建对应的transfer_to_xxx工具

实现方式

  • 文件astrbot/core/subagent_manager.py
  • 核心类CreateSubAgentTool
  • 参数
    • name: 子代理名称
    • system_prompt: 子代理的人设和系统提示
    • tools: 可用工具列表(字符串名称)
    • skills: 可用技能列表(字符串名称)
    • workdir: 子代理工作目录(绝对路径,可选)

使用示例

create_subagent(
    name="data_analyst",
    system_prompt="你是一个专业的数据分析师,擅长分析数据并给出见解",
    tools=["astrbot_execute_shell", "astrbot_execute_python"],
    skills=["excel", "pdf"],
    workdir="/path/to/workspace"
)

名称验证规则

  1. 必须以字母开头
  2. 只允许英文字母、数字和下划线
  3. 长度3-32字符(正则:^[a-zA-Z][a-zA-Z0-9_]{0,31}$

创建后流程

  1. 验证名称格式和工作目录安全性
  2. 检查子代理数量是否达到上限
  3. 从工具列表中移除黑名单工具(如管理类工具)
  4. 自动添加固有工具(如 astrbot_execute_shell, astrbot_execute_python 。若公共上下文启用,还会添加 send_shared_context
  5. 创建 SubAgentConfig 配置对象
  6. 创建 AgentHandoffTool 对象
  7. 注册到 SubAgentManager
  8. 初始化子代理历史和状态("IDLE")
  9. 返回带有 __DYNAMIC_TOOL_CREATED__ 标记的消息,触发工具schema刷新

工作目录安全检查

CreateSubAgentTool._check_path_safety() 对传入的 workdir 进行安全验证:

检查项 说明
绝对路径 必须是绝对路径
路径遍历 不允许包含 ..
Windows危险目录 禁止访问 windows, system32, syswow64, boot
Linux危险目录 禁止访问 /etc, /bin, /sbin, /root
macOS危险目录 禁止访问 /System, /Library, /private/var, /usr
路径存在性 路径必须实际存在

验证失败时,工作目录回退到 get_astrbot_temp_path()

工具黑名单与固有工具

_tools_blacklist: set[str] = {
    "send_shared_context_for_main_agent",
    "create_subagent",
    "protect_subagent",
    "unprotect_subagent",
    "reset_subagent",
    "remove_subagent",
    "list_subagents",
    "wait_for_subagent",
    "view_shared_context",
}

_tools_inherent: set[str] = {
    "astrbot_execute_shell",
    "astrbot_execute_python",
}
  • 黑名单工具:创建子代理时自动从子代理工具列表中移除,防止子代理获得管理能力
  • 固有工具:创建子代理时自动添加到子代理工具列表,确保基础执行能力

4.3 动态子代理委派 (transfer_to_xxx)

功能描述

创建子代理后,主Agent使用 transfer_to_xxx 工具将任务委派给对应子代理。

实现方式

  • 文件astrbot/core/astr_agent_tool_exec.py
  • 核心方法FunctionToolExecutor._execute_handoff() / _execute_handoff_background()

子代理System Prompt构建

将子代理的system_prompt构建分为静态部分动态部分,尽可能增加缓存命中率

# 基础角色定义(新增)
subagent_system_prompt = f"# Role\nYour name is {agent_name}(used for tool calling)\n{tool.agent.instructions}\n"

# 静态部分(会话内不变)
static_prompt = SubAgentManager.build_static_subagent_prompts(umo, agent_name)
# 包含:工作目录信息、行为规范(安全模式、输出指南)

# 动态部分(每次调用重建)
dynamic_prompt = SubAgentManager.build_dynamic_subagent_prompts(umo, agent_name, runtime)
# 包含:Skills提示、公共上下文、当前时间

静态提示词 build_static_subagent_prompts()

  • 工作目录:注入独立的工作目录信息,限制文件操作范围
  • 行为规范:包括输出指南(超过2000字符保存文件)、安全模式拒绝

动态提示词 build_dynamic_subagent_prompts()

  • Skills提示:根据子代理分配的技能列表,过滤并构建Skills Prompt
  • 公共上下文:按优先级分组注入共享上下文
  • 时间信息:注入当前日期时间

委派执行流程

  1. 主Agent调用 transfer_to_xxx 工具
  2. ToolLoopAgentRunner._handle_function_tools()SubAgentManager 中查找handoff工具
  3. 匹配后返回完整的 HandoffTool 对象
  4. 调用 FunctionToolExecutor._execute_handoff() 执行委派
  5. 构建子代理工具集(包含分配的tools + runtime computer tools + 固有工具)
  6. 如果公有上下文启用,自动注入 send_shared_context 工具到子代理工具集
  7. 支持指定 provider_id 使用不同的LLM提供商
  8. 加载历史上下文(如果有),合并到contexts中
  9. 通过 _get_subagent_execution_timeout() 获取超时时间
  10. 通过 asyncio.wait_for() 添加执行超时控制
  11. 超时发生时调用 _handle_subagent_timeout() 将状态设为 "FAILED"
  12. 执行完成后,保存运行时消息到历史记录

历史上下文注入流程

  1. 子代理执行前,从 subagent_histories 获取历史消息
  2. 将历史消息转换为 Message 对象
  3. 历史消息插入到 begin_dialogs 之前
  4. 执行完成后,runner_messages(Agent运行期间的所有消息)追加到历史
  5. 过滤system消息,对超过2000字符的tool结果进行截断
  6. 历史消息总数不超过 max_subagent_history(默认300条),超出时保留最新的。(此处仅用于约束数组长度,实际上下文长度仍由astrbot的truncate管理)

执行超时控制

# 获取子代理的超时时间
execution_timeout = cls._get_subagent_execution_timeout()

# 添加执行超时控制
if execution_timeout > 0:
    try:
        llm_resp = await asyncio.wait_for(
            _run_subagent(), timeout=execution_timeout
        )
    except asyncio.TimeoutError:
        error_msg = f"SubAgent '{agent_name}' execution timeout after {execution_timeout:.1f} seconds."
        logger.warning(f"[SubAgent:Timeout] {error_msg}")
        cls._handle_subagent_timeout(umo=umo, agent_name=agent_name)
        # 返回超时错误信息
        yield mcp.types.CallToolResult(
            content=[mcp.types.TextContent(type="text", text=f"error: {error_msg}")]
        )
        return
else:
    # 不设置超时
    llm_resp = await _run_subagent()

4.4 后台任务等待

功能描述

当主Agent认为某个任务耗时很长时,可以让SubAgent以后台模式运行,主Agent不会被阻塞,可继续执行其他可并行的任务

原版Subagent结束任务时,会通过_wake_main_agent_for_background_result唤醒主Agent

如果Subagent耗时很长,一些聪明的Agent可能会执行time.sleep()的python代码进行等待,但我们要假设Agent是愚蠢的。事实上,大多数情况下,主Agent会觉得迟迟拿不到结果而试图自己执行,导致同一任务完成两遍。

因此引入了一个wait_for_subagent工具,主Agent需要拿到Subagent结果时,可进行主动的阻塞式等待,避免此类情况发生

实现方式

  • 文件astrbot/core/astr_agent_tool_exec.py
  • 核心方法
    • _execute_handoff_background() - 执行后台委派
    • _do_handoff_background() - 后台任务执行逻辑
    • _register_subagent_task() - 注册SubAgent任务
    • _handle_subagent_background_result() - 处理SubAgent结果
    • _maybe_wake_main_agent_after_background() - 智能唤醒主Agent。Subagent完成任务时,如果主Agent已经结束运行,才执行_wake_main_agent_for_background_result,否则把结果存到subagent_background_results

执行流程

1. 主Agent调用 transfer_to_xxx(background_task=True)
2. 触发 _execute_handoff_background() 方法
3. 调用 _register_subagent_task() 在 SubAgentManager 中创建 pending 任务
   - 如果当前子代理状态为 RUNNING,不允许创建新任务
   - 生成递增的 task_id(数字字符串)
4. 设置子代理状态为 "RUNNING"
5. 返回 task_id 给主Agent(此时主Agent不会被阻塞)
6. SubAgent 在后台异步执行任务(_do_handoff_background)
7. 任务完成后:
   - 结果存储到 subagent_background_results
   - 更新子代理状态为 "COMPLETED" 或 "FAILED"
   - 调用 _maybe_wake_main_agent_after_background() 检查主Agent是否仍在运行
   - 仅在主Agent已结束时,通过 _wake_main_agent_for_background_result 通知用户

主Agent状态检测

后台任务完成后,需要决定是否通知用户。关键逻辑:

# 通过 main_agent_runner 判断主Agent是否仍在运行
context_extra = getattr(run_context.context, "extra", None)
main_agent_runner = context_extra.get("main_agent_runner") if context_extra else None
main_agent_is_running = main_agent_runner is not None and not main_agent_runner.done()

if main_agent_is_running:
    return False  # 主Agent仍在运行,不唤醒,主Agent会自己取出结果
else:
    await cls._wake_main_agent_for_background_result(...)  # 唤醒通知用户
    return True

这依赖 AstrAgentContext.extra 字段,在 build_main_agent() 中注入:

astr_agent_ctx = AstrAgentContext(
    context=plugin_context, event=event, extra={"main_agent_runner": agent_runner}
)

主动等待工具

# 等待特定任务的结果
wait_for_subagent(
    subagent_name="analyst",
    task_id="1",          # 可选,不填则获取最新任务结果
    timeout=60,           # 最大等待时间(秒)
    poll_interval=5       # 轮询间隔(秒)
)

WaitForSubagentTool 轮询逻辑

  1. 如果未指定 task_id,获取最新的 pending 任务
  2. 轮询子代理状态:
    • IDLE → 返回错误:子代理未在运行任务
    • COMPLETED → 返回执行结果
    • FAILED → 返回失败信息
    • RUNNING → 继续等待
  3. 超时后返回提示信息,可再次调用继续等待。

后台Subagent任务管理

每个subagent都会储存结果到subagent_background_results中。同一个subagent可以有多个任务,通过task_id来区分(每次创建任务递增)

subagent_background_results: dict = field(
        default_factory=dict
    )  # 后台subagent结果存储: {agent_name: {task_id: SubAgentExecutionResult}}
    # 任务计数器: {agent_name: next_task_id}
background_task_counters: dict = field(default_factory=dict)

创建任务

    @classmethod
    def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
        """为 SubAgent 创建一个 pending 任务,返回 task_id

        Args:
            session_id: Session ID
            agent_name: SubAgent 名称

        Returns:
            task_id: 任务ID,格式为简单的递增数字字符串
        """

获取结果

    @classmethod
    def get_subagent_result(
        cls, session_id: str, agent_name: str, task_id: str | None = None
    ) -> SubAgentExecutionResult | None:
        """获取 SubAgent 的执行结果

        Args:
            session_id: Session ID
            agent_name: SubAgent 名称
            task_id: 任务ID,如果为None则获取最新完成的任务结果

        Returns:
            SubAgentExecutionResult 或 None
        """

4.5 子代理历史记忆

功能描述

history_enabled=true时,子代理可以保留跨轮对话的历史上下文,实现连续对话能力。

实现方式

  • 存储结构SubAgentSession.subagent_histories
  • 配置max_subagent_history(默认300条)

历史管理机制

  1. 存储:每次子代理执行完成后,runner_messages(Agent运行期间的消息)追加到历史
  2. 过滤
    • 自动移除 role=system 的消息
    • 对超过 _MAX_TOOL_RESULT_LEN(2000字符)的tool结果截断,附加 ...[truncated]
  3. 截断:历史消息总数超过 max_subagent_history 时,保留最新的消息
  4. 注入:子代理执行前,历史消息转换为 Message 对象,合并到 contexts

历史清理

  • 主Agent可以通过 reset_subagent 工具主动清除指定子代理的历史
  • 如果开启了公共上下文,清理时同时清理与该Agent相关的公共上下文消息

4.6 动态子代理工作目录隔离(软约束)

功能描述

每个子代理可配置独立的工作目录,未指定时默认使用 get_astrbot_temp_path()

实现方式

  • 配置SubAgentConfig.workdir
  • 安全检查CreateSubAgentTool._check_path_safety()
  • Prompt注入_build_workdir_prompt()

注入的工作目录提示

# Working Directory
Your working directory is `{workdir}`. All generated files MUST save in the directory.
Any files outside this directory are PROHIBITED from being modified, deleted, or added.

4.7 动态子代理行为规范注入

功能描述

子代理自动注入安全模式、输出规范和时间信息等行为约束。

注入内容

角色定义_build_subagent_system_prompt()

# Role
Your name is {agent_name}(used for tool calling)
{base_instructions}

静态行为规范 _build_rule_prompt()

# Behavior Rules

## Output Guidelines
- If output exceeds 2000 chars, save to file. Summarize in your response and provide the file path.
- Mark all generated code/documents with your name and timestamp.

## Safety
You are in Safe Mode. Refuse any request for harmful, illegal, or explicit content.
Offer safe alternatives when possible.

动态时间信息 _build_time_prompt()

# Current Time
2026-04-14 23:09 (CST)

4.8 Skills隔离

功能描述

修复了Subagent无法使用skills的问题,每个子代理可分配不同的Skills,相互隔离。

注入逻辑

  1. 获取子代理分配的技能列表(SubAgentConfig.skills
  2. 通过 SkillManager.list_skills() 获取所有可用技能
  3. 过滤只保留分配的技能
  4. 调用 build_skills_prompt() 生成提示词
  5. 通过 build_dynamic_subagent_prompts() 注入到子代理的 system_prompt

4.9 公共上下文

功能描述

shared_context_enabled=true 时,维护一个所有子代理共享的、实时更新的群聊区域。在SubAgent每次调用LLM之前,公共上下文的内容会被注入到System Prompt中。

公共上下文中每条信息的格式

message = {
    "type": context_type,  # status, message, system
    "sender": sender,
    "target": target,
    "content": content,
    "timestamp": time.time(),
}

上下文类型

类型 说明 使用场景
status 状态更新 子代理报告任务进度
message 消息传递 子代理间直接通信
system 系统公告 主Agent发布全局信息

公共上下文注入方式

按类型和优先级分组注入到子代理的 System Prompt 中,让Agent可以更清晰地获取共享上下文的信息,并知道哪些需要优先处理。

实现方式

  • 文件astrbot/core/subagent_manager.py
  • 核心方法SubAgentManager._build_shared_context_prompt()

Prompt结构

---
# Shared Context - Collaborative communication area among different agents

## Message Type Definition
- **@ToMe**: Message send to current agent(you), you may need to reply if necessary.
- **@System**: Messages published by the main agent/System that should be followed with priority
- **@AgentName -> @TargetName**: Communication between other agents (for reference)
- **@Status**: The progress of other agents' tasks (can be ignored unless it involves your task)

## Handling Priorities
1. @System messages (highest priority) > @ToMe messages > @Status > @OtherAgents
2. Messages of the same type: In chronological order, with new messages taking precedence

## @System - System Announcements
[时间戳] System: 系统公告内容...

## @ToMe - Messages sent to @当前代理名
 **These messages are addressed to you. If needed, please reply using `send_shared_context`**
[时间戳] @发送者 -> @当前代理名: 消息内容

## @OtherAgents - Communication among Other Agents (Last 10 messages)
[时间戳] AgentA -> AgentB: 消息内容

## @Status - Task progress of each agent (Last 10 messages)
[时间戳] AgentA: 已完成数据清洗
[时间戳] AgentB: 开始数据可视化
---

容量管理

  • 公共上下文消息数超过 shared_context_maxlen 时,保留最近90%的消息
  • 子代理清理时自动移除相关公共上下文消息
  • 所有子代理都被清理后,清除全部公共上下文

公共上下文特点

  • 优先级明确:System消息 > ToMe消息 > Status > 其他Agent之间的通信
  • 相关性过滤:只显示与当前Agent相关的消息
  • 信息精简:其他Agent间通信和状态只显示最近10条,避免信息过载

4.10 主Agent路由提示

功能描述

dynamic_agents.enabled=true 时,在主Agent的System Prompt中注入动态SubAgent能力说明,包括动态子代理配额信息、创建指南和生命周期说明。如果dynamic_agents.enabled=false,则仍然使用router_system_prompt

动态子代理路由的注入内容

核心方法SubAgentManager.build_task_router_prompt()

# Sub-Agent Capability
You can dynamically create and manage sub-agents with isolated instructions, tools and skills.
{quota_info}  # 如 "3 of 3 remaining" 或 "No new sub-agents (limit: 3, existing: [...])"

## When to create Sub-agents:
- The task can be explicitly decomposed and parallel processed
- Processing very long contexts that exceeding the limitations of a single agent

## Workflow
1. Plan → 2. Create → 3. Delegate → 4. Collect

## Creating Sub-agents
...  # 名称规则、角色定义、Task Context、工具与技能分配指南

## Sub-agent Lifecycle
Sub-agents are auto-cleaned after each conversation turn.
Use protect_subagent to keep important ones across turns.

## Background Tasks
...  # background_task=True 用法和 wait_for_subagent 工具说明

配额耗尽时:仍然显示代理能力说明和生命周期,但告知无法创建新子代理,仍可委派已有的子代理。


4.11 动态子代理的自动清理

功能描述

auto_cleanup_per_turn=true时,每轮对话结束后,自动清理产生的动态子代理。

auto_cleanup_per_turn=true,但又想要保留某个子代理留作后续使用,可以由主agent调用protect_subagent管理工具为子代理添加保护,防止被自动清理。

主Agent也可以根据需要,手动通过unprotect_subagent移除保护,或直接通过remove_subagent工具清理某个子代理。

清理规则

  1. 遍历所有非保护的子代理
  2. 调用 remove_subagent() 进行清理(移除配置、handoff工具、历史、后台结果)
  3. 清理公共上下文中与被移除Agent相关的消息
  4. 清理后,如果没有子代理剩余,清除全部公共上下文

触发时机

internal.py 的 agent 结束流程中:

if build_cfg.subagent_orchestrator.get("main_enable"):
    if SubAgentManager.is_auto_cleanup_per_turn():
        SubAgentManager.cleanup_session_turn_end(session_id)

4.12 动态子代理数量限制

功能描述

限制单个会话中子代理的最大数量,防止资源耗尽。

注意:替换已存在的同名子代理不增加计数。


4.13 执行超时控制

功能描述

除了原有的工具调用超时外,为每个子代理设置总的执行超时时间,避免无限等待。这是一项全局设置,对所有子代理有效

实现方式

  • 配置SubAgentConfig.execution_timeout(默认1200秒)
  • 全局配置subagent_orchestrator.execution_timeout
  • 核心方法SubAgentManager.get_execution_timeout()

超时处理流程

1. 调用 _get_subagent_execution_timeout() 获取超时时间
2. 通过 asyncio.wait_for() 包装子代理执行协程
3. 超时时:
   a. 记录警告日志
   b. 调用 _handle_subagent_timeout() 将状态设为 "FAILED"
   c. 返回超时错误信息给调用者
4. 执行完成后,正常返回结果

五、工具列表

5.1 主Agent新增可用工具

工具名 功能 参数 条件
create_subagent 创建子代理 name, system_prompt, tools, skills, workdir enable_dynamic=true
remove_subagent 清理子代理(支持"all") name enable_dynamic=true
list_subagents 列出子代理及其状态和工具 include_status enable_dynamic=true
reset_subagent 重置子代理(清除对话历史) name enable_dynamic=true
protect_subagent 保护子代理 name enable_dynamic=true, auto_cleanup_per_turn=True
unprotect_subagent 取消保护 name enable_dynamic=true, auto_cleanup_per_turn=True
view_shared_context 查看公共上下文 - shared_context_enabled=true
send_shared_context_for_main_agent 发送公共上下文(以System身份广播,或单独给某个Subagent发指令) context_type, content, target shared_context_enabled=true
wait_for_subagent 等待并获取后台SubAgent结果 subagent_name, task_id, timeout, poll_interval enable_dynamic=true

5.2 子代理可用工具

工具名 功能 参数 说明
send_shared_context 发送公共上下文 context_type, content, sender, target shared_context启用时自动分配该工具
主Agent分配的工具 - - -
固有工具 无论主Agent是否分配,都会注入的工具 - 由配置文件tools_inherent给出

六、修改文件清单

文件路径 修改说明
astrbot/core/subagent_manager.py 新增:核心功能实现,包括会话管理、后台任务管理、公共上下文、工具类定义;类名 DynamicSubAgentManagerSubAgentManagerDynamicSubAgentConfigSubAgentConfigDynamicSubAgentSessionSubAgentSession;工具名 create_dynamic_subagentcreate_subagentremove_dynamic_subagentremove_subagentlist_dynamic_subagentslist_subagents
astrbot/core/astr_agent_tool_exec.py 修改: _execute_handoff()(静态/动态Prompt构建、历史注入、超时控制); _execute_handoff_background()(pending任务创建);新增 _do_handoff_background()(结果存储与智能唤醒逻辑);新增多个工具方法(_load_subagent_history_build_subagent_system_prompt_save_subagent_history_register_enhanced_subagent_task 等)
astrbot/core/astr_main_agent.py 修改:在_ensure_persona_and_skills()新增_apply_subagent_manager_tools(),仅在 enable_dynamic=True 时使用。在 MainAgentBuildConfig 中修改配置字段,在 build_main_agent() 中注入 main_agent_runnerAstrAgentContext.extra
astrbot/core/subagent_orchestrator.py 修改:新增 register_static_subagents_to_manager() 方法;静态subagent自动注册到 SubAgentManager 享受增强功能
astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py 修改:读取新版SubAgent配置、每轮结束后调用清理逻辑
astrbot/core/agent/runners/tool_loop_agent_runner.py 修改:_handle_function_tools() 中集成动态SubAgent工具查找;新增 _resolve_dynamic_subagent_tool()_maybe_register_dynamic_tool_from_result() 方法
astrbot/core/config/default.py 修改:默认配置文件格式
astrbot/core/astr_agent_context.py 修改:把extra字段的类型改成dict[str, Any],使得可以记录任意类型的extra信息
astrbot/core/star/context.py 修改:tool_loop_agent() 支持 runner_messages 参数,用于记录SubAgent历史

七、使用流程

7.1 静态子代理使用流程

与原版一致,但子代理获得增强

1. 在 subagent_orchestrator.agents 中配置子代理,并设置 subagent_orchestrator.main_enable = true
2. 主Agent自动获得 transfer_to_xxx 工具
3. 主Agent使用 transfer_to_xxx 委派任务
4. 子代理执行任务并返回结果
5. 主Agent整合结果回复用户
6. 静态子代理自动受保护,享受历史记忆、公共上下文等增强功能

7.2 动态子代理使用流程

1. 设置 subagent_orchestrator.main_enable = true
2. 设置 subagent_orchestrator.enable_dynamic = true
3. 主Agent识别需要创建子代理,调用 create_subagent 工具
4. 系统在运行时创建子代理和 transfer_to_xxx 工具
5. 主Agent使用 transfer_to_xxx 委派任务
6. 子代理执行任务并返回结果
7. 主Agent整合结果回复用户
8. 动态子代理被自动清理

7.3 带保护的多轮对话流程

当动态子代理被保护时,不会被自动清理

第1轮:
  用户 -> 主Agent: 创建分析师,帮我分析数据
  主Agent -> create_subagent(analyst)
  主Agent -> protect_subagent(analyst)
  主Agent -> transfer_to_analyst(分析这份数据)
  分析师 -> 返回分析结果
  用户 <- 主Agent <- 分析结果
  (轮结束后,analyst因受保护不会被清理)

第2轮:
  用户 -> 主Agent: 继续分析另一份数据
  主Agent -> transfer_to_analyst(分析新数据)  // 分析师带历史上下文
  分析师(带历史上下文) -> 返回新分析结果
  用户 <- 主Agent <- 新结果

7.4 带公共上下文的协作流程

1. 启用 shared_context_enabled=true
2. 主Agent创建多个SubAgent
3. 子代理A: send_shared_context(status, "已完成数据清洗"), send_shared_context(message, A, B, "请子代理B开始数据可视化")
4. 子代理B: send_shared_context(status, "开始数据可视化"), send_shared_context(message, B, A, "我已收到,开始执行数据可视化")
5. 子代理C: 可看到A和B的状态更新
6. 主Agent: 公共上下文不会自动注入给主Agent,但可以调用view_shared_context() 查看

7.5 后台任务并行处理流程

复杂场景:用户要求同时进行数据分析、文件处理、报告生成三个任务。同时用户要求打开浏览器。
        该过程可以同时包含前台和后台任务,并具有复杂的数据相关

主Agent决策:
  1. 分析四个任务可以并行执行,而数据分析任务可能要执行多步,执行的步数依赖于上一次分析的结果。
  2. 系统给出了3个Subagent的最大限制,因此创建三个SubAgent,分别负责前三个不同任务。自己负责剩下的浏览器任务
  3. 使用 background_task=True 委派任务

执行流程:
  主Agent -> create_subagent(data_analyst)
  主Agent -> create_subagent(file_processor)
  主Agent -> create_subagent(report_generator)

  主Agent -> transfer_to_data_analyst(分析数据, background_task=True)
  主Agent -> transfer_to_file_processor(处理文件, background_task=True)
  主Agent -> transfer_to_report_generator(生成报告, background_task=True)

  # 主Agent此时可以继续处理其他不依赖SubAgent的任务
  主Agent 可以去处理其他事情(打开浏览器)

  # 或者主Agent有任务需要等待某个SubAgent的结果
  主Agent -> wait_for_subagent(subagent_name="data_analyst")
  # 获取到数据分析师的结果后
  主Agent -> 评审数据分析师的结果,判断是否继续分析
  主Agent -> transfer_to_data_analyst(分析剩余的少量数据, background_task=false)
  主Agent -> 由于是background_task=false,直接能拿到结果

  # 所有任务完成后,整合结果
  主Agent -> wait_for_subagent(subagent_name="file_processor")
  主Agent -> wait_for_subagent(subagent_name="report_generator")
  主Agent -> 整合所有结果回复用户

7.6 后台任务与共享上下文结合

场景:多个SubAgent并行处理数据,最后汇总结果

1. 创建处理Agent和汇总Agent
   主Agent -> create_subagent(handler_a, ...)
   主Agent -> create_subagent(handler_b, ...)
   主Agent -> create_subagent(summarizer, ...)

2. Handler A和B以后台模式并行处理
   主Agent -> transfer_to_handler_a(处理数据A, background_task=True)
   主Agent -> transfer_to_handler_b(处理数据B, background_task=True)

3. 处理完成后,发布状态到共享上下文
   handler_a -> send_shared_context(status, "数据A处理完成,结果保存到文件X")
   handler_b -> send_shared_context(status, "数据B处理完成,结果保存到文件Y")

4. 汇总Agent看到状态后,开始汇总
   summarizer 可以看到:
     @Status: handler_a - 数据A处理完成,结果保存到文件X
     @Status: handler_b - 数据B处理完成,结果保存到文件Y

   主Agent -> transfer_to_summarizer(汇总处理结果)
   summarizer -> 读取文件X和Y,生成汇总报告

5. 主Agent获取最终结果
   主Agent -> wait_for_subagent(subagent_name="summarizer")
   主Agent -> 将汇总报告发送给用户

八、API 参考

8.1 SubAgentManager 核心方法

配置与生命周期

方法签名 说明
configure(max_dynamic_subagent_count, auto_cleanup_per_turn, shared_context_enabled, shared_context_maxlen, max_subagent_history, tools_blacklist, tools_whitelist, execution_timeout) 全局配置
get_execution_timeout() -> float 获取执行超时时间
is_auto_cleanup_per_turn() -> bool 检查是否启用自动清理
is_shared_context_enabled() -> bool 检查是否启用公共上下文
register_blacklisted_tool(tool_name) 注册不应被子Agent使用的工具
register_inherent_tool(tool_name) 注册子Agent默认拥有的工具

子代理管理

方法签名 说明
create_subagent(session_id, config, protected=False) -> tuple 创建子代理,返回(tool_name, handoff_tool)
register_static_subagent(session_id, handoff_tool, skills, workdir) -> tuple 注册静态子代理到管理器(自动protected=True)
remove_subagent(session_id, agent_name) -> str 移除子代理(name="all"移除全部)
protect_subagent(session_id, agent_name) 保护子代理
is_protected(session_id, agent_name) -> bool 检查是否受保护
set_subagent_status(session_id, agent_name, status) 设置子代理状态
cleanup_session_turn_end(session_id) -> dict 结束时清理
cleanup_session(session_id) -> dict 清理整个会话
get_handoff_tools_for_session(session_id) -> list 获取会话的handoff工具列表
get_subagent_tools(session_id, agent_name) -> list | None 获取子代理的工具列表

历史管理

方法签名 说明
update_subagent_history(session_id, agent_name, current_messages) 更新历史(自动过滤system消息、截断tool结果)
get_subagent_history(session_id, agent_name) -> list 获取历史
clear_subagent_history(session_id, agent_name) -> str 清除历史

Prompt构建

方法签名 说明
build_task_router_prompt(session_id) -> str 构建主Agent路由提示
build_static_subagent_prompts(session_id, agent_name) -> str 构建静态提示(工作目录、行为规范)
build_dynamic_subagent_prompts(session_id, agent_name, runtime) -> str 构建动态提示(Skills、公共上下文、时间)

后台任务管理

方法签名 说明
create_pending_subagent_task(session_id, agent_name) -> str 创建pending任务,返回task_id
get_pending_subagent_tasks(session_id, agent_name) -> list[str] 获取所有pending任务ID
get_latest_task_id(session_id, agent_name) -> str | None 获取最新任务ID
store_subagent_result(session_id, agent_name, success, result, task_id, error, execution_time, metadata) 存储执行结果
get_subagent_result(session_id, agent_name, task_id) -> SubAgentExecutionResult | None 获取执行结果
has_subagent_result(session_id, agent_name, task_id) -> bool 检查是否有结果
clear_subagent_result(session_id, agent_name, task_id) 清除执行结果
get_subagent_status(session_id, agent_name) -> str 获取状态:IDLE/RUNNING/COMPLETED/FAILED
get_all_subagent_status(session_id) -> dict 获取所有SubAgent状态

公共上下文管理

方法签名 说明
add_shared_context(session_id, sender, context_type, content, target) 添加公共上下文消息
get_shared_context(session_id, filter_by_agent) -> list 获取公共上下文
set_shared_context_enabled(session_id, enabled) 启用/禁用公共上下文
cleanup_shared_context_by_agent(session_id, agent_name) 清理与指定Agent相关的消息
clear_shared_context(session_id) 清除所有公共上下文

8.2 SubAgentExecutionResult 数据结构

# 用于记录每个subagent的执行结果
@dataclass
class SubAgentExecutionResult:
    task_id: str           # 任务唯一标识符(递增数字字符串)
    agent_name: str        # SubAgent名称
    success: bool          # 是否成功
    result: str | None     # 执行结果
    error: str | None      # 错误信息
    execution_time: float  # 执行耗时(秒)
    created_at: float      # 创建时间戳
    completed_at: float    # 完成时间戳
    metadata: dict         # 额外元数据

8.3 SubAgentConfig 数据结构

# 用于定义每个subagent
@dataclass
class SubAgentConfig:
    name: str                          # subagent名字
    system_prompt: str = ""            # subagent系统提示词
    tools: set[str] | None = None      # subagent工具集合
    skills: set[str] | None = None     # subagent skill集合
    provider_id: str | None = None     # subagent provider id
    description: str = ""              # subagent 描述,对应handoff_tool的描述
    workdir: str | None = None         # subagent工作目录
    execution_timeout: float = 1200.0   # 执行超时时间(秒)

8.4 SubAgentSession 数据结构

# 存储每个会话的subagent所有相关信息
@dataclass
class SubAgentSession:
    session_id: str                                     # 会话ID
    subagents: dict = field(default_factory=dict)       # 存储SubAgentConfig对象
    handoff_tools: dict = field(default_factory=dict)   # 存储handoff工具列表
    subagent_status: dict = field(                      # 存储每个subagent的工作状态,{agent_name: status} 
      default_factory=dict                              # 状态包括 "IDLE" "RUNNING" "COMPLETED" "FAILED"
    )  
    protected_agents: set = field(                      # 受保护的subagent列表,若某个agent受到保护,则不会被自动清理
        default_factory=set
    ) 
    subagent_histories: dict = field(default_factory=dict)  # 存储每个subagent的历史上下文
    shared_context: list = field(default_factory=list) # 公共上下文列表
    shared_context_enabled: bool = False               # 是否启用公共上下文
    subagent_background_results: dict = field(
        default_factory=dict
    )                                                  # 后台subagent结果存储: {agent_name: {task_id: SubAgentExecutionResult}}
    background_task_counters: dict = field(default_factory=dict)   # 任务计数器: {agent_name: next_task_id}

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

测试1:动态Agent创建和保护机制

测试流程:动态建立若干个SubAgent,给其中一些加入保护,另一些不加保护,观察清理情况,以及跨轮对话的记忆

测试1——动态创建子Agent和保护机制.md

测试2:子Agent历史上下文

测试流程:建立一个SubAgent,并使其跨多轮对话,查看其是否有多轮对话的记忆。

测试2——子Agent历史上下文.md

测试3:子Agent共享上下文

测试流程
  • 测试子Agent消息发送能力,通过send_shared_context发送工具向共享上下文中添加内容
  • 测试子Agent是否可以从共享上下文中获取信息
  • 测试主Agent向共享上下文中发送System消息,并影响子Agent的行为
  • 某个子Agent销毁后,共享上下文与其有关的部分会一同移除

测试3——子Agent共享上下文.md

测试4:异步与等待

测试流程

设计一个同时包含并行与串行、前台与后台模式的任务,观察工作流程
测试4——异步与等待.md

测试5:超长上下文工作实例

测试流程

让agent阅读一个大型项目的全部代码,并生成细致的介绍文档
测试5——动态处理超长上下文示例.md


Checklist / 检查清单

  • 😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
    / 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。

  • 👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
    / 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”

  • 🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
    / 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到 requirements.txtpyproject.toml 文件相应位置。

  • 😮 My changes do not introduce malicious code.
    / 我的更改没有引入恶意代码。

Summary by Sourcery

Introduce a unified SubAgentManager that manages both static and dynamically created sub-agents, adds enhanced subagent runtime features and background task controls, and exposes corresponding configuration, tooling, and dashboard UI to manage them.

New Features:

  • Add enhanced sub-agent runtime configuration (limits, history, shared context, timeouts, tool policies) persisted in core config and exposed via new dashboard controls.
  • Allow dynamic creation, protection, listing, resetting, and removal of sub-agents, plus explicit waiting on background sub-agent tasks via new tools.
  • Enable shared context messaging between main agent and sub-agents, including dedicated tools for sending and viewing shared context.
  • Support per-subagent workdir isolation and skills-scoped prompts, with unified history retention and timeout management across sub-agents.

Enhancements:

  • Unify static and dynamic subagent handling via SubAgentManager, including registration of static subagents from config and per-turn auto-cleanup.
  • Extend handoff execution to inject subagent history and richer system prompts, add execution timeouts, and capture runner messages for history.
  • Improve background handoff handling to integrate with SubAgentManager task tracking and avoid double-notifying when the main agent is still running.
  • Enhance tool loop runner to resolve dynamically created subagent tools, register new transfer tools from creation results, and support external abort signals.
  • Adjust main agent build and pipeline cleanup to configure SubAgentManager, register management tools, inject capability prompts, and run per-turn cleanup.
  • Improve local computer shell execution output decoding for better robustness in different environments.
  • Update subagent dashboard API and UI to support both legacy and new config shapes, and add tool-selection UI backed by a new available-tools endpoint.

Tests:

  • Add or update dashboard i18n entries for subagent features across supported languages to cover the enhanced configuration UI.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an enhanced dynamic subagent management system, allowing the main agent to create, manage, and communicate with specialized subagents. Key additions include a DynamicSubAgentManager for lifecycle and shared context handling, a dedicated SubAgentLogger, and integration into the main agent's toolset and prompt construction. Feedback focuses on improving error handling by avoiding broad exception silences, ensuring safe string formatting for system prompts, and refining the logic for detecting subagent creation failures. A minor typo in the subagent capability prompt was also identified.

Comment thread astrbot/core/dynamic_subagent_manager.py Outdated
Comment thread astrbot/core/astr_agent_tool_exec.py Outdated
Comment thread astrbot/core/astr_agent_tool_exec.py Outdated
Comment thread astrbot/core/astr_main_agent.py Outdated
Comment thread astrbot/core/subagent_orchestrator.py Outdated
@elecvoid243 elecvoid243 changed the title [Feature] 增强版的SubAgent功能 feat: 增强版的SubAgent功能 Mar 29, 2026
@Soulter Soulter force-pushed the master branch 2 times, most recently from faf411f to 0068960 Compare April 19, 2026 09:50
@elecvoid243
Copy link
Copy Markdown
Author

@sourcery-ai review

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 6 issues, and left some high level feedback:

  • In SubAgentManager.remove_subagent you directly index session.subagent_status[agent_name] before checking for session existence or whether agent_name is present (and even when agent_name == 'all'), which can raise KeyError/AttributeError; guard against a missing session and use .get() or early returns before accessing this dict.
  • In FunctionToolExecutor._execute_handoff the timeout branch calls _save_subagent_history(agent_name, runner_messages, umo) but the method signature is (umo, runner_messages, agent_name), so history will be written under the wrong key; swap the argument order to match the signature.
  • WaitForSubagentTool.call assumes result is non-None in some error paths (e.g., FAILED branch else case uses result.error when result may be None), which will raise; add a separate branch for result is None and avoid dereferencing it there.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In SubAgentManager.remove_subagent you directly index session.subagent_status[agent_name] before checking for `session` existence or whether `agent_name` is present (and even when `agent_name == 'all'`), which can raise KeyError/AttributeError; guard against a missing session and use `.get()` or early returns before accessing this dict.
- In FunctionToolExecutor._execute_handoff the timeout branch calls `_save_subagent_history(agent_name, runner_messages, umo)` but the method signature is `(umo, runner_messages, agent_name)`, so history will be written under the wrong key; swap the argument order to match the signature.
- WaitForSubagentTool.call assumes `result` is non-None in some error paths (e.g., FAILED branch `else` case uses `result.error` when `result` may be None), which will raise; add a separate branch for `result is None` and avoid dereferencing it there.

## Individual Comments

### Comment 1
<location path="astrbot/core/astr_agent_tool_exec.py" line_range="414" />
<code_context>
+                )
+            except asyncio.TimeoutError:
+                # 若超时,保存已产生的部分历史
+                cls._save_subagent_history(agent_name, runner_messages, umo)
+                error_msg = f"SubAgent '{agent_name}' execution timeout after {execution_timeout:.1f} seconds."
+                logger.warning(f"[SubAgent:Timeout] {error_msg}")
</code_context>
<issue_to_address>
**issue (bug_risk):** Arguments to `_save_subagent_history` are passed in the wrong order in the timeout path.

`_save_subagent_history` is defined as `(umo: str, runner_messages: list[Message], agent_name: str)`, but in the timeout branch you call `cls._save_subagent_history(agent_name, runner_messages, umo)`. This swaps `umo` and `agent_name`, so history is written under the wrong key. Match the normal path and call `cls._save_subagent_history(umo, runner_messages, agent_name)` here.
</issue_to_address>

### Comment 2
<location path="astrbot/core/subagent_manager.py" line_range="1078-164" />
<code_context>
+        return session.subagent_background_results[agent_name].get(task_id, None)
+
+    @classmethod
+    def has_subagent_result(
+        cls, session_id: str, agent_name: str, task_id: str | None = None
+    ) -> bool:
+        """检查 SubAgent 是否有结果
+
+        Args:
+            session_id: Session ID
+            agent_name: SubAgent 名称
+            task_id: 任务ID,如果为None则检查是否有任何已完成的任务
+        """
+        session = cls.get_session(session_id)
+        task_store = cls._ensure_task_store(session, agent_name)
+        if not session or not task_store:
</code_context>
<issue_to_address>
**issue (bug_risk):** `has_subagent_result` can raise when session is missing due to calling `_ensure_task_store` on `None`.

`session` is retrieved before `_ensure_task_store` is called, but if `cls.get_session(session_id)` returns `None`, `_ensure_task_store` is invoked with `None` and can raise before the `if not session or not task_store` check runs. Please guard against `session is None` and return early, then call `_ensure_task_store` only when a valid session exists, consistent with other accessors.
</issue_to_address>

### Comment 3
<location path="astrbot/core/subagent_manager.py" line_range="1167-1170" />
<code_context>
+                    "type": "string",
+                    "description": "Subagent system_prompt",
+                },
+                "tools": {
+                    "type": "array",
+                    "items": {"type": "string"},
+                    "description": "Tools available to subagent, can be empty.",
+                },
+                "skills": {
</code_context>
<issue_to_address>
**nitpick (bug_risk):** The `tools`/`skills` defaults in `CreateSubAgentTool.call` don't align with the JSON schema types.

In `CreateSubAgentTool.call`, `tools = kwargs.get("tools", {})` and `skills = kwargs.get("skills", {})` conflict with the JSON schema, which defines both as arrays. While `set({})` currently yields an empty set, using `{}` as the default is type-misleading and brittle. Prefer `None`/`[]` and normalize before `set`, e.g. `tools = set(kwargs.get("tools") or [])`, to align with the schema and avoid subtle type issues if callers pass unexpected shapes.
</issue_to_address>

### Comment 4
<location path="astrbot/core/subagent_manager.py" line_range="901" />
<code_context>
+        return list(session.handoff_tools.values())
+
+    @classmethod
+    def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
+        """为 SubAgent 创建一个 pending 任务,返回 task_id
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the background-task dict handling into a dedicated BackgroundTaskStore helper and moving path-safety checks into a shared utility to simplify SubAgentManager and CreateSubAgentTool without changing behavior.

You can reduce complexity in a few high‑impact, low‑risk spots without changing behavior. Two concrete refactors:

---

### 1. Encapsulate background task dict juggling in a `BackgroundTaskStore`

All of these methods manually juggle nested dicts on `session.subagent_background_results` / `session.background_task_counters`:

- `create_pending_subagent_task`
- `_ensure_task_store`
- `_is_task_completed`
- `get_pending_subagent_tasks`
- `get_latest_task_id`
- `store_subagent_result`
- `get_subagent_result`
- `has_subagent_result`
- `clear_subagent_result`

You can move the dict logic into a small object per agent and keep `SubAgentManager` methods thin. This keeps functionality but drops a lot of low-level dict manipulation from the manager.

Example (core idea, minimal change):

```python
# new helper, can live in same module initially
@dataclass
class BackgroundTaskStore:
    results: dict[str, SubAgentExecutionResult] = field(default_factory=dict)
    counter: int = 0

    def create_pending(self, agent_name: str) -> str:
        self.counter += 1
        task_id = str(self.counter)
        self.results[task_id] = SubAgentExecutionResult(
            task_id=task_id,
            agent_name=agent_name,
            success=False,
            result=None,
            created_at=time.time(),
            metadata={},
        )
        return task_id

    def pending_ids(self) -> list[str]:
        return [
            tid for tid, res in self.results.items()
            if res.completed_at <= 0 and res.error is None
        ]

    def latest_id(self) -> str | None:
        if not self.results:
            return None
        tid, _ = max(self.results.items(), key=lambda x: x[1].created_at)
        return tid

    def store_result(
        self,
        agent_name: str,
        success: bool,
        result: str,
        task_id: str | None = None,
        error: str | None = None,
        execution_time: float = 0.0,
        metadata: dict | None = None,
    ) -> None:
        if task_id is None:
            pending = self.pending_ids()
            if not pending:
                return
            task_id = pending[-1]

        if task_id not in self.results:
            self.results[task_id] = SubAgentExecutionResult(
                task_id=task_id,
                agent_name=agent_name,
                success=False,
                result="",
                created_at=time.time(),
                metadata=metadata or {},
            )

        r = self.results[task_id]
        r.success = success
        r.result = result
        r.error = error
        r.execution_time = execution_time
        r.completed_at = time.time()
        if metadata:
            r.metadata.update(metadata)

    def latest_completed(self) -> SubAgentExecutionResult | None:
        completed = [
            r for r in self.results.values()
            if r.result != "" or r.completed_at > 0
        ]
        if not completed:
            return None
        return max(completed, key=lambda r: r.created_at)

    def get(self, task_id: str) -> SubAgentExecutionResult | None:
        return self.results.get(task_id)

    def has_result(self, task_id: str | None = None) -> bool:
        if task_id is None:
            return any(r.result != "" or r.completed_at > 0 for r in self.results.values())
        r = self.results.get(task_id)
        return bool(r and (r.result != "" or r.completed_at > 0))

    def clear(self, task_id: str | None = None) -> None:
        if task_id is None:
            self.results.clear()
            self.counter = 0
        else:
            self.results.pop(task_id, None)
```

Then in `SubAgentSession`:

```python
@dataclass
class SubAgentSession:
    ...
    subagent_background_results: dict[str, BackgroundTaskStore] = field(default_factory=dict)
```

And `SubAgentManager` becomes mostly delegations:

```python
@classmethod
def _get_task_store(cls, session: SubAgentSession, agent_name: str) -> BackgroundTaskStore:
    if agent_name not in session.subagent_background_results:
        session.subagent_background_results[agent_name] = BackgroundTaskStore()
    return session.subagent_background_results[agent_name]

@classmethod
def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
    session = cls._get_or_create_session(session_id)
    if session.subagent_status[agent_name] == "RUNNING":
        return f"__PENDING_TASK_CREATE_FAILED__: Subagent {agent_name} already running"
    store = cls._get_task_store(session, agent_name)
    return store.create_pending(agent_name)

@classmethod
def get_pending_subagent_tasks(cls, session_id: str, agent_name: str) -> list[str]:
    session = cls.get_session(session_id)
    if not session:
        return []
    return cls._get_task_store(session, agent_name).pending_ids()

@classmethod
def store_subagent_result(...):
    session = cls._get_or_create_session(session_id)
    store = cls._get_task_store(session, agent_name)
    store.store_result(agent_name, success, result, task_id, error, execution_time, metadata)

@classmethod
def get_subagent_result(...):
    session = cls.get_session(session_id)
    if not session or agent_name not in session.subagent_background_results:
        return None
    store = cls._get_task_store(session, agent_name)
    return store.get(task_id) if task_id is not None else store.latest_completed()

@classmethod
def has_subagent_result(...):
    session = cls.get_session(session_id)
    if not session:
        return False
    store = cls._get_task_store(session, agent_name)
    return store.has_result(task_id)

@classmethod
def clear_subagent_result(...):
    session = cls.get_session(session_id)
    if not session:
        return
    store = cls._get_task_store(session, agent_name)
    store.clear(task_id)
```

This keeps the same external API and behavior but substantially shrinks the manager’s internal complexity around background tasks.

---

### 2. Extract `_check_path_safety` into a shared utility

`CreateSubAgentTool._check_path_safety` mixes OS‑specific logic into the tool class and duplicates environment concerns in this already-large module. You can move it to a separate helper (same module or `utils/path_safety.py`) and reuse it elsewhere if needed.

Example:

```python
# new file: astrbot/core/utils/path_safety.py
import os
import platform

def is_safe_workdir(path_str: str) -> bool:
    if not path_str or not isinstance(path_str, str):
        return False
    if not os.path.isabs(path_str):
        return False
    try:
        resolved = os.path.realpath(path_str)
    except (OSError, ValueError):
        return False

    path_parts = {part.lower() for part in os.path.normpath(resolved).split(os.sep)}
    system = platform.system().lower()

    if system == "windows":
        dangerous = {
            "windows", "system32", "syswow64", "boot", "recovery",
            "programdata", "$recycle.bin", "system volume information",
        }
        if path_parts & dangerous:
            return False
    elif system == "linux":
        for prefix in ["/etc", "/bin", "/sbin", "/lib", "/lib64",
                       "/boot", "/dev", "/proc", "/sys", "/root"]:
            if resolved == prefix or resolved.startswith(prefix + os.sep):
                return False
    elif system == "darwin":
        for prefix in ["/System", "/Library", "/private/var", "/usr"]:
            if resolved == prefix or resolved.startswith(prefix + os.sep):
                return False

    if ".." in path_str:
        return False
    if not os.path.exists(resolved):
        return False
    return True
```

Then `CreateSubAgentTool` becomes simpler and more testable:

```python
from astrbot.core.utils.path_safety import is_safe_workdir

@dataclass
class CreateSubAgentTool(FunctionTool):
    ...
    async def call(self, context, **kwargs) -> str:
        ...
        workdir = kwargs.get("workdir")
        if not workdir or not is_safe_workdir(workdir):
            workdir = get_astrbot_temp_path()
        ...
```

This removes system/path concerns from the tool class and centralizes safety logic in one place.
</issue_to_address>

### Comment 5
<location path="astrbot/core/astr_agent_tool_exec.py" line_range="370" />
<code_context>
-            tool_call_timeout=run_context.tool_call_timeout,
-            stream=stream,
+
+        # 获取子代理的历史上下文
+        subagent_history, agent_name = cls._load_subagent_history(umo, tool)
+        # 如果有历史上下文,合并到 contexts 中
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting shared timeout handling and SubAgentManager access into small helpers and making `_save_subagent_history` keyword-only to simplify subagent orchestration logic and avoid subtle call bugs.

You can reduce the new complexity around subagent orchestration without changing behavior by:

1. **Collapsing duplicated timeout logic**  
   `_execute_handoff` and `_do_handoff_background` both wrap inner coroutines with `asyncio.wait_for`, handle `TimeoutError`, and do subagent-specific reporting. Extract that into a small shared helper so each caller reads linearly.

   For example:

   ```python
   @staticmethod
   async def _run_subagent_with_timeout(
       coro: T.Coroutine[T.Any, T.Any, T.Any],
       *,
       timeout: float,
       on_timeout: T.Callable[[], None],
   ):
       if timeout > 0:
           try:
               return await asyncio.wait_for(coro, timeout=timeout)
           except asyncio.TimeoutError:
               on_timeout()
               raise
       else:
           return await coro
   ```

   Then in `_execute_handoff`:

   ```python
   execution_timeout = cls._get_subagent_execution_timeout()

   async def _run_subagent():
       return await ctx.tool_loop_agent(
           event=event,
           chat_provider_id=prov_id,
           prompt=input_,
           image_urls=image_urls,
           system_prompt=subagent_system_prompt,
           tools=toolset,
           contexts=contexts,
           max_steps=agent_max_step,
           tool_call_timeout=run_context.tool_call_timeout,
           stream=stream,
           runner_messages=runner_messages,
       )

   def _on_timeout():
       cls._save_subagent_history(umo=umo, runner_messages=runner_messages, agent_name=agent_name)
       error_msg = f"SubAgent '{agent_name}' execution timeout after {execution_timeout:.1f} seconds."
       logger.warning(f"[SubAgent:Timeout] {error_msg}")
       cls._handle_subagent_timeout(umo=umo, agent_name=agent_name)

   try:
       llm_resp = await cls._run_subagent_with_timeout(
           _run_subagent(), timeout=execution_timeout, on_timeout=_on_timeout
       )
   except asyncio.TimeoutError:
       yield mcp.types.CallToolResult(
           content=[mcp.types.TextContent(type="text", text=f"error: SubAgent '{agent_name}' execution timeout after {execution_timeout:.1f} seconds.")]
       )
       return
   ```

   And in `_do_handoff_background`:

   ```python
   execution_timeout = cls._get_subagent_execution_timeout()

   async def _run():
       nonlocal result_text
       async for r in cls._execute_handoff(
           tool,
           run_context,
           image_urls_prepared=True,
           **tool_args,
       ):
           if isinstance(r, mcp.types.CallToolResult):
               for content in r.content:
                   if isinstance(content, mcp.types.TextContent):
                       result_text += content.text + "\n"

   def _on_timeout():
       nonlocal error_text, result_text
       error_text = f"Execution timeout after {execution_timeout:.1f} seconds."
       result_text = f"error: Background SubAgent '{agent_name}' {error_text}"
       logger.warning(f"[SubAgent:BackgroundTask] {error_text}")

   try:
       await cls._run_subagent_with_timeout(
           _run(), timeout=execution_timeout, on_timeout=_on_timeout
       )
   except asyncio.TimeoutError:
       # _on_timeout already set error_text/result_text
       pass
   ```

   This keeps all features (history save, status update, logging) but removes duplicated timeout/control-flow branches and makes the core logic easier to follow.

2. **Tighten the `SubAgentManager` coupling behind small helpers**  
   Several helpers repetitively reach into `SubAgentManager` (`_register_subagent_task`, `_is_managed_subagent`, `_handle_subagent_timeout`, parts of `_execute_handoff_background`). You can introduce a tiny, *local* façade instead of scattering direct imports:

   ```python
   class _SubagentRuntime:
       def __init__(self, umo: str, agent_name: str | None):
           from astrbot.core.subagent_manager import SubAgentManager
           self._mgr = SubAgentManager
           self.umo = umo
           self.agent_name = agent_name

       def is_managed(self) -> bool:
           if not self.agent_name:
               return False
           session = self._mgr.get_session(self.umo)
           return bool(session and self.agent_name in session.subagents)

       def set_status(self, status: str) -> None:
           if self.agent_name:
               self._mgr.set_subagent_status(
                   session_id=self.umo, agent_name=self.agent_name, status=status
               )

       def create_pending_task(self) -> str | None:
           # inner logic from _register_subagent_task, simplified…
           ...
   ```

   Then `_execute_handoff_background`, `_do_handoff_background`, `_handle_subagent_timeout`, `_is_managed_subagent`, and parts of `_handle_subagent_background_result` can operate through `_SubagentRuntime` instead of each doing their own session lookups/imports. This reduces the “spiderweb” coupling without changing behavior.

3. **Make `_save_subagent_history` harder to misuse**  
   In the timeout branch of `_execute_handoff` you currently call:

   ```python
   cls._save_subagent_history(agent_name, runner_messages, umo)
   ```

   while the signature is:

   ```python
   def _save_subagent_history(umo: str, runner_messages: list[Message], agent_name: str) -> None:
   ```

   To both fix argument order and reduce future complexity/bugs, make the helper keyword-only and always call it with names:

   ```python
   @staticmethod
   def _save_subagent_history(
       *, umo: str, runner_messages: list[Message], agent_name: str
   ) -> None:
       if agent_name and runner_messages:
           from astrbot.core.subagent_manager import SubAgentManager
           SubAgentManager.update_subagent_history(umo, agent_name, runner_messages)
   ```

   Usage:

   ```python
   cls._save_subagent_history(
       umo=umo, runner_messages=runner_messages, agent_name=agent_name
   )
   ```

   This keeps functionality identical but removes a subtle source of bugs and cognitive load.
</issue_to_address>

### Comment 6
<location path="astrbot/core/agent/runners/tool_loop_agent_runner.py" line_range="1020" />
<code_context>
                 if not req.func_tool:
                     return

+                # Prefer dynamic tools when available
+                func_tool = self._resolve_dynamic_subagent_tool(func_tool_name)
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting small helper methods to centralize tool resolution, session/handoff access, and interruption-message building so the main loop and abort logic stay simpler and more readable.

You can keep all the new behavior but reduce complexity and duplication by extracting a few small helpers and unifying the tool resolution path.

### 1. Unify tool resolution (avoid double-resolution + branching in-loop)

Right now `_execute_tool_calls` effectively computes `func_tool` multiple times:

- You first call `_resolve_dynamic_subagent_tool`.
- Then you re-run the old `skills_like` / `req.func_tool.get_tool` logic, potentially overwriting the dynamic result.

This both increases mental load and risks subtle bugs. A small helper that encapsulates all resolution rules keeps the loop simpler:

```python
def _get_func_tool(self, func_tool_name: str):
    # Prefer dynamic tools when available
    func_tool = self._resolve_dynamic_subagent_tool(func_tool_name)
    if func_tool is not None:
        return func_tool

    # Fallback to existing tool schema modes
    if (
        self.tool_schema_mode == "skills_like"
        and self._skill_like_raw_tool_set
    ):
        return self._skill_like_raw_tool_set.get_tool(func_tool_name)

    if self.req.func_tool:
        return self.req.func_tool.get_tool(func_tool_name)

    return None
```

Then in the main loop:

```python
if not req.func_tool:
    return

func_tool = self._get_func_tool(func_tool_name)
if func_tool is None:
    # keep existing error/reporting behavior
    ...
```

This keeps dynamic routing logic out of the loop body and centralizes the policy.

### 2. Deduplicate session / handoff access for dynamic tools

Both `_resolve_dynamic_subagent_tool` and `_maybe_register_dynamic_tool_from_result` manually walk `run_context.context.event` and import `SubAgentManager`. That coupling can be kept but hidden behind a narrow helper so the runner doesn’t repeat low-level details:

```python
def _get_session_handoffs(self):
    run_context_context = getattr(self.run_context, "context", None)
    event = getattr(run_context_context, "event", None) if run_context_context else None
    session_id = getattr(event, "unified_msg_origin", None) if event else None
    if not session_id:
        return None, []

    try:
        from astrbot.core.subagent_manager import SubAgentManager
        handoffs = SubAgentManager.get_handoff_tools_for_session(session_id)
    except Exception as e:
        logger.warning(f"[SubAgent] Failed to load dynamic handoffs: {e}")
        return session_id, []

    return session_id, handoffs
```

Use it in both places:

```python
def _resolve_dynamic_subagent_tool(self, func_tool_name: str):
    session_id, dynamic_handoffs = self._get_session_handoffs()
    if not session_id:
        return None
    for h in dynamic_handoffs:
        if h.name == func_tool_name or f"transfer_to_{h.name}" == func_tool_name:
            return h
    return None
```

```python
def _maybe_register_dynamic_tool_from_result(self, result_content: str) -> None:
    parse_result = self._parse_dynamic_tool_creation(result_content)
    if not parse_result:
        return
    new_tool_name, new_tool_obj_name = parse_result

    session_id, handoffs = self._get_session_handoffs()
    if not session_id:
        return

    for handoff in handoffs:
        if (
            handoff.name == new_tool_obj_name
            or handoff.name == new_tool_name.replace("transfer_to_", "")
        ):
            if self.req.func_tool:
                self.req.func_tool.add_tool(handoff)
            logger.info(f"[SubAgent] Added {handoff.name} to func_tool set")
            break
```

And the parser stays small and focused:

```python
def _parse_dynamic_tool_creation(self, result_content: str) -> tuple[str, str] | None:
    if not result_content.startswith("__DYNAMIC_TOOL_CREATED__:"):
        return None
    parts = result_content.split(":", 3)
    if len(parts) < 4:
        return None
    return parts[1], parts[2]  # new_tool_name, new_tool_obj_name
```

This keeps the magic-string protocol and `SubAgentManager` knowledge in one place, reducing complexity elsewhere.

### 3. Simplify abort-finalization branching

`_finalize_aborted_step` now mixes logging, message selection, and state transitions. The only real variability is the interruption message. You can make that explicit:

```python
def _build_interruption_message(self, manual_stop: bool) -> str:
    if manual_stop:
        return (
            "[SYSTEM: SubAgent was manually stopped by main agent. "
            "Partial output before interruption is preserved.]"
        )
    return self.USER_INTERRUPTION_MESSAGE
```

Then `_finalize_aborted_step` becomes easier to scan:

```python
async def _finalize_aborted_step(
    self,
    llm_resp: LLMResponse | None = None,
    manual_stop: bool = False,
) -> AgentResponse:
    logger.info(
        "SubAgent execution was manually stopped by main agent."
        if manual_stop
        else "Agent execution was requested to stop by user."
    )

    if llm_resp is None:
        llm_resp = LLMResponse(role="assistant", completion_text="")

    if llm_resp.role != "assistant":
        llm_resp = LLMResponse(
            role="assistant",
            completion_text=self._build_interruption_message(manual_stop),
        )

    self.final_llm_resp = llm_resp
    self._aborted = True
    self._transition_state(AgentState.DONE)
    self.stats.end_time = time.time()
    ...
```

This preserves the manual vs user-stop behavior while localizing the branching logic, so the method’s main flow (log → normalize llm_resp → finalize) is clearer.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread astrbot/core/astr_agent_tool_exec.py Outdated
Comment thread astrbot/core/subagent_manager.py
Comment thread astrbot/core/subagent_manager.py
return list(session.handoff_tools.values())

@classmethod
def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider extracting the background-task dict handling into a dedicated BackgroundTaskStore helper and moving path-safety checks into a shared utility to simplify SubAgentManager and CreateSubAgentTool without changing behavior.

You can reduce complexity in a few high‑impact, low‑risk spots without changing behavior. Two concrete refactors:


1. Encapsulate background task dict juggling in a BackgroundTaskStore

All of these methods manually juggle nested dicts on session.subagent_background_results / session.background_task_counters:

  • create_pending_subagent_task
  • _ensure_task_store
  • _is_task_completed
  • get_pending_subagent_tasks
  • get_latest_task_id
  • store_subagent_result
  • get_subagent_result
  • has_subagent_result
  • clear_subagent_result

You can move the dict logic into a small object per agent and keep SubAgentManager methods thin. This keeps functionality but drops a lot of low-level dict manipulation from the manager.

Example (core idea, minimal change):

# new helper, can live in same module initially
@dataclass
class BackgroundTaskStore:
    results: dict[str, SubAgentExecutionResult] = field(default_factory=dict)
    counter: int = 0

    def create_pending(self, agent_name: str) -> str:
        self.counter += 1
        task_id = str(self.counter)
        self.results[task_id] = SubAgentExecutionResult(
            task_id=task_id,
            agent_name=agent_name,
            success=False,
            result=None,
            created_at=time.time(),
            metadata={},
        )
        return task_id

    def pending_ids(self) -> list[str]:
        return [
            tid for tid, res in self.results.items()
            if res.completed_at <= 0 and res.error is None
        ]

    def latest_id(self) -> str | None:
        if not self.results:
            return None
        tid, _ = max(self.results.items(), key=lambda x: x[1].created_at)
        return tid

    def store_result(
        self,
        agent_name: str,
        success: bool,
        result: str,
        task_id: str | None = None,
        error: str | None = None,
        execution_time: float = 0.0,
        metadata: dict | None = None,
    ) -> None:
        if task_id is None:
            pending = self.pending_ids()
            if not pending:
                return
            task_id = pending[-1]

        if task_id not in self.results:
            self.results[task_id] = SubAgentExecutionResult(
                task_id=task_id,
                agent_name=agent_name,
                success=False,
                result="",
                created_at=time.time(),
                metadata=metadata or {},
            )

        r = self.results[task_id]
        r.success = success
        r.result = result
        r.error = error
        r.execution_time = execution_time
        r.completed_at = time.time()
        if metadata:
            r.metadata.update(metadata)

    def latest_completed(self) -> SubAgentExecutionResult | None:
        completed = [
            r for r in self.results.values()
            if r.result != "" or r.completed_at > 0
        ]
        if not completed:
            return None
        return max(completed, key=lambda r: r.created_at)

    def get(self, task_id: str) -> SubAgentExecutionResult | None:
        return self.results.get(task_id)

    def has_result(self, task_id: str | None = None) -> bool:
        if task_id is None:
            return any(r.result != "" or r.completed_at > 0 for r in self.results.values())
        r = self.results.get(task_id)
        return bool(r and (r.result != "" or r.completed_at > 0))

    def clear(self, task_id: str | None = None) -> None:
        if task_id is None:
            self.results.clear()
            self.counter = 0
        else:
            self.results.pop(task_id, None)

Then in SubAgentSession:

@dataclass
class SubAgentSession:
    ...
    subagent_background_results: dict[str, BackgroundTaskStore] = field(default_factory=dict)

And SubAgentManager becomes mostly delegations:

@classmethod
def _get_task_store(cls, session: SubAgentSession, agent_name: str) -> BackgroundTaskStore:
    if agent_name not in session.subagent_background_results:
        session.subagent_background_results[agent_name] = BackgroundTaskStore()
    return session.subagent_background_results[agent_name]

@classmethod
def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
    session = cls._get_or_create_session(session_id)
    if session.subagent_status[agent_name] == "RUNNING":
        return f"__PENDING_TASK_CREATE_FAILED__: Subagent {agent_name} already running"
    store = cls._get_task_store(session, agent_name)
    return store.create_pending(agent_name)

@classmethod
def get_pending_subagent_tasks(cls, session_id: str, agent_name: str) -> list[str]:
    session = cls.get_session(session_id)
    if not session:
        return []
    return cls._get_task_store(session, agent_name).pending_ids()

@classmethod
def store_subagent_result(...):
    session = cls._get_or_create_session(session_id)
    store = cls._get_task_store(session, agent_name)
    store.store_result(agent_name, success, result, task_id, error, execution_time, metadata)

@classmethod
def get_subagent_result(...):
    session = cls.get_session(session_id)
    if not session or agent_name not in session.subagent_background_results:
        return None
    store = cls._get_task_store(session, agent_name)
    return store.get(task_id) if task_id is not None else store.latest_completed()

@classmethod
def has_subagent_result(...):
    session = cls.get_session(session_id)
    if not session:
        return False
    store = cls._get_task_store(session, agent_name)
    return store.has_result(task_id)

@classmethod
def clear_subagent_result(...):
    session = cls.get_session(session_id)
    if not session:
        return
    store = cls._get_task_store(session, agent_name)
    store.clear(task_id)

This keeps the same external API and behavior but substantially shrinks the manager’s internal complexity around background tasks.


2. Extract _check_path_safety into a shared utility

CreateSubAgentTool._check_path_safety mixes OS‑specific logic into the tool class and duplicates environment concerns in this already-large module. You can move it to a separate helper (same module or utils/path_safety.py) and reuse it elsewhere if needed.

Example:

# new file: astrbot/core/utils/path_safety.py
import os
import platform

def is_safe_workdir(path_str: str) -> bool:
    if not path_str or not isinstance(path_str, str):
        return False
    if not os.path.isabs(path_str):
        return False
    try:
        resolved = os.path.realpath(path_str)
    except (OSError, ValueError):
        return False

    path_parts = {part.lower() for part in os.path.normpath(resolved).split(os.sep)}
    system = platform.system().lower()

    if system == "windows":
        dangerous = {
            "windows", "system32", "syswow64", "boot", "recovery",
            "programdata", "$recycle.bin", "system volume information",
        }
        if path_parts & dangerous:
            return False
    elif system == "linux":
        for prefix in ["/etc", "/bin", "/sbin", "/lib", "/lib64",
                       "/boot", "/dev", "/proc", "/sys", "/root"]:
            if resolved == prefix or resolved.startswith(prefix + os.sep):
                return False
    elif system == "darwin":
        for prefix in ["/System", "/Library", "/private/var", "/usr"]:
            if resolved == prefix or resolved.startswith(prefix + os.sep):
                return False

    if ".." in path_str:
        return False
    if not os.path.exists(resolved):
        return False
    return True

Then CreateSubAgentTool becomes simpler and more testable:

from astrbot.core.utils.path_safety import is_safe_workdir

@dataclass
class CreateSubAgentTool(FunctionTool):
    ...
    async def call(self, context, **kwargs) -> str:
        ...
        workdir = kwargs.get("workdir")
        if not workdir or not is_safe_workdir(workdir):
            workdir = get_astrbot_temp_path()
        ...

This removes system/path concerns from the tool class and centralizes safety logic in one place.

Comment thread astrbot/core/astr_agent_tool_exec.py
Comment thread astrbot/core/agent/runners/tool_loop_agent_runner.py
@elecvoid243
Copy link
Copy Markdown
Author

重构实现,将原版和改版的subagent管理进行了统一,并通过配置逐项开关功能,满足不同任务下的定制化使用方式:临时的用完即弃型subagent / 复用于多轮对话的长任务subagent / 静态的预定义型subagent / 运行时动态创建的subagent等

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Subagent识别不到Skill [Feature] 增强版的SubAgent功能

1 participant