From 689af19e108ac48499afc62bd8566ca0fac3a542 Mon Sep 17 00:00:00 2001 From: kaixinyujue <2465367308@qq.com> Date: Sat, 18 Apr 2026 21:02:14 +0800 Subject: [PATCH 1/4] fix: enable plugin hooks for active_agent cron jobs by dispatching through PipelineScheduler Previously, active_agent cron jobs called _woke_main_agent() directly, bypassing the PipelineScheduler pipeline. This caused plugin hooks (on_llm_response, on_decorating_result, etc.) to not be triggered, and emotion tags were not parsed by plugins like meme_manager. Changes: - core/cron/events.py: Use session.platform_name for PlatformMetadata.name - core/cron/manager.py: Add _dispatch_to_pipeline() to route cron jobs through the standard event queue instead of direct agent wake --- astrbot/core/cron/events.py | 3 +- astrbot/core/cron/manager.py | 53 +++++++++++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/astrbot/core/cron/events.py b/astrbot/core/cron/events.py index a90ca38227..2b1de24b8a 100644 --- a/astrbot/core/cron/events.py +++ b/astrbot/core/cron/events.py @@ -25,8 +25,9 @@ def __init__( extras: dict[str, Any] | None = None, message_type: MessageType = MessageType.FRIEND_MESSAGE, ) -> None: + # 使用会话的平台名称而非固定的 "cron",确保插件能够正确识别平台类型 platform_meta = PlatformMetadata( - name="cron", + name=session.platform_name, description="CronJob", id=session.platform_id, ) diff --git a/astrbot/core/cron/manager.py b/astrbot/core/cron/manager.py index aa11bb601f..e54a2aee44 100644 --- a/astrbot/core/cron/manager.py +++ b/astrbot/core/cron/manager.py @@ -4,6 +4,7 @@ from datetime import datetime, timezone from typing import TYPE_CHECKING, Any from zoneinfo import ZoneInfo +from asyncio import Queue from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger @@ -41,6 +42,8 @@ def __init__(self, db: BaseDatabase) -> None: async def start(self, ctx: "Context") -> None: self.ctx: Context = ctx # star context async with self._lock: + # 从 Context 获取事件队列,用于将定时任务消息放入管道 + self._event_queue: Queue = ctx.get_event_queue() if self._started: return self.scheduler.start() @@ -266,7 +269,9 @@ async def _run_active_agent_job(self, job: CronJob, start_time: datetime) -> Non "cron_payload": payload, } - await self._woke_main_agent( + # 将定时任务消息放入事件队列,使其经过完整的 PipelineScheduler 流程 + # 这样插件的 on_llm_response 等处理器可以正常拦截和处理消息 + await self._dispatch_to_pipeline( message=note, session_str=session_str, extras=extras, @@ -390,5 +395,51 @@ async def _woke_main_agent( logger.warning("Cron job agent got no response") return + async def _dispatch_to_pipeline( + self, + *, + message: str, + session_str: str, + extras: dict, + ) -> None: + """将定时任务消息放入事件队列,使其经过完整的 PipelineScheduler 流程。""" + from astrbot.core.cron.events import CronMessageEvent + from astrbot.core.platform.message_session import MessageSession + + try: + session = ( + session_str + if isinstance(session_str, MessageSession) + else MessageSession.from_str(session_str) + ) + except Exception as e: + logger.error(f"Invalid session for cron job: {e}") + return + + cron_event = CronMessageEvent( + context=self.ctx, + session=session, + message=message, + extras=extras or {}, + message_type=session.message_type, + ) + + # 判断用户角色 + umo = cron_event.unified_msg_origin + cfg = self.ctx.get_config(umo=umo) + cron_payload = extras.get("cron_payload", {}) if extras else {} + sender_id = cron_payload.get("sender_id") + admin_ids = cfg.get("admins_id", []) + if admin_ids: + cron_event.role = "admin" if sender_id in admin_ids else "member" + if cron_payload.get("origin", "tool") == "api": + cron_event.role = "admin" + + # 将事件放入事件队列,由 EventBus 调度到 PipelineScheduler + await self._event_queue.put(cron_event) + logger.debug( + f"Cron job {extras.get('cron_job', {}).get('id')} dispatched to pipeline." + ) + __all__ = ["CronJobManager"] From 061a51b02ebe24a7a1e3ac3589081a738be61748 Mon Sep 17 00:00:00 2001 From: kaixinyujue <2465367308@qq.com> Date: Sat, 18 Apr 2026 21:30:16 +0800 Subject: [PATCH 2/4] fix: enable plugin hooks for active_agent cron jobs by dispatching through PipelineScheduler Fixes #7419 Previously, active_agent cron jobs called _woke_main_agent() directly, bypassing the PipelineScheduler pipeline. This caused plugin hooks (on_llm_response, on_decorating_result, etc.) to not be triggered, and emotion tags were not parsed by plugins like meme_manager. Changes: - core/cron/events.py: Use session.platform_name for PlatformMetadata.name - core/cron/manager.py: Add _dispatch_to_pipeline() to route cron jobs through the standard event queue instead of direct agent wake --- astrbot/core/cron/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/astrbot/core/cron/manager.py b/astrbot/core/cron/manager.py index e54a2aee44..6d23f98555 100644 --- a/astrbot/core/cron/manager.py +++ b/astrbot/core/cron/manager.py @@ -1,10 +1,10 @@ import asyncio import json +from asyncio import Queue from collections.abc import Awaitable, Callable from datetime import datetime, timezone from typing import TYPE_CHECKING, Any from zoneinfo import ZoneInfo -from asyncio import Queue from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger From ca4798bd6a88d1787b2715ac69ba7edb187cb393 Mon Sep 17 00:00:00 2001 From: kaixinyujue <2465367308@qq.com> Date: Sat, 18 Apr 2026 22:44:15 +0800 Subject: [PATCH 3/4] test3 --- astrbot/core/cron/manager.py | 64 +++++++++--------------------------- 1 file changed, 16 insertions(+), 48 deletions(-) diff --git a/astrbot/core/cron/manager.py b/astrbot/core/cron/manager.py index 6d23f98555..19a6c6a361 100644 --- a/astrbot/core/cron/manager.py +++ b/astrbot/core/cron/manager.py @@ -277,7 +277,7 @@ async def _run_active_agent_job(self, job: CronJob, start_time: datetime) -> Non extras=extras, ) - async def _woke_main_agent( + async def _dispatch_to_pipeline( self, *, message: str, @@ -324,6 +324,15 @@ async def _woke_main_agent( if cron_payload.get("origin", "tool") == "api": cron_event.role = "admin" + # 步骤 1: 将事件放入事件队列,触发插件钩子(如 on_llm_response, on_decorating_result) + # 设置 skip_llm 标志,提示 Pipeline 跳过 LLM 调用(需要 Pipeline 支持) + cron_event.set_extra("skip_llm", True) + await self._event_queue.put(cron_event) + logger.debug( + f"Cron job {extras.get('cron_job', {}).get('id')} dispatched to pipeline (hooks triggered)." + ) + + # 步骤 2: 直接调用 build_main_agent 处理 LLM 请求(注入系统提示词) tool_call_timeout = cfg.get("provider_settings", {}).get( "tool_call_timeout", 120 ) @@ -335,7 +344,8 @@ async def _woke_main_agent( req = ProviderRequest() conv = await _get_session_conv(event=cron_event, plugin_context=self.ctx) req.conversation = conv - # finetine the messages + + # 注入历史对话上下文 context = json.loads(conv.history) if context: req.contexts = context @@ -347,6 +357,8 @@ async def _woke_main_agent( f"{context_dump}\n" f"---\n" ) + + # 注入 cron 任务系统提示词 cron_job_str = json.dumps(extras.get("cron_job", {}), ensure_ascii=False) req.system_prompt += PROACTIVE_AGENT_CRON_WOKE_SYSTEM_PROMPT.format( cron_job=cron_job_str @@ -375,6 +387,8 @@ async def _woke_main_agent( # agent will send message to user via using tools pass llm_resp = runner.get_final_llm_resp() + + # 步骤 3: 保存历史记录(含 cron 元数据) cron_meta = extras.get("cron_job", {}) if extras else {} summary_note = ( f"[CronJob] {cron_meta.get('name') or cron_meta.get('id', 'unknown')}: {cron_meta.get('description', '')} " @@ -395,51 +409,5 @@ async def _woke_main_agent( logger.warning("Cron job agent got no response") return - async def _dispatch_to_pipeline( - self, - *, - message: str, - session_str: str, - extras: dict, - ) -> None: - """将定时任务消息放入事件队列,使其经过完整的 PipelineScheduler 流程。""" - from astrbot.core.cron.events import CronMessageEvent - from astrbot.core.platform.message_session import MessageSession - - try: - session = ( - session_str - if isinstance(session_str, MessageSession) - else MessageSession.from_str(session_str) - ) - except Exception as e: - logger.error(f"Invalid session for cron job: {e}") - return - - cron_event = CronMessageEvent( - context=self.ctx, - session=session, - message=message, - extras=extras or {}, - message_type=session.message_type, - ) - - # 判断用户角色 - umo = cron_event.unified_msg_origin - cfg = self.ctx.get_config(umo=umo) - cron_payload = extras.get("cron_payload", {}) if extras else {} - sender_id = cron_payload.get("sender_id") - admin_ids = cfg.get("admins_id", []) - if admin_ids: - cron_event.role = "admin" if sender_id in admin_ids else "member" - if cron_payload.get("origin", "tool") == "api": - cron_event.role = "admin" - - # 将事件放入事件队列,由 EventBus 调度到 PipelineScheduler - await self._event_queue.put(cron_event) - logger.debug( - f"Cron job {extras.get('cron_job', {}).get('id')} dispatched to pipeline." - ) - __all__ = ["CronJobManager"] From c7dd6159e6f90cc657132409ca54d55cb28840f9 Mon Sep 17 00:00:00 2001 From: kaixinyujue <2465367308@qq.com> Date: Sat, 18 Apr 2026 22:58:26 +0800 Subject: [PATCH 4/4] fix: enable plugin hooks for active_agent cron jobs by dispatching through PipelineScheduler Fixes #7419 Previously, active_agent cron jobs called _woke_main_agent() directly, bypassing the PipelineScheduler pipeline. This caused plugin hooks to not be triggered, and emotion tags were not parsed by plugins meme_manager. Changes: - core/cron/events.py: Use session.platform_name for PlatformMetadata.name - core/cron/manager.py: Add _dispatch_to_pipeline() to route cron jobs through the standard event queue instead of direct agent wake - core/cron/manager.py: Remove legacy _woke_main_agent logic as it is no longer needed after pipeline integration - core/cron/manager.py: Preserve history saving functionality via PipelineScheduler's automatic _save_to_history() call The _woke_main_agent approach has been fully replaced. History persistence is now handled automatically by PipelineScheduler after agent execution, eliminating the need for explicit persist_agent_history() calls. --- astrbot/core/cron/manager.py | 100 +++-------------------------------- 1 file changed, 6 insertions(+), 94 deletions(-) diff --git a/astrbot/core/cron/manager.py b/astrbot/core/cron/manager.py index 19a6c6a361..c38204ff99 100644 --- a/astrbot/core/cron/manager.py +++ b/astrbot/core/cron/manager.py @@ -1,5 +1,4 @@ import asyncio -import json from asyncio import Queue from collections.abc import Awaitable, Callable from datetime import datetime, timezone @@ -11,13 +10,10 @@ from apscheduler.triggers.date import DateTrigger from astrbot import logger -from astrbot.core.agent.tool import ToolSet from astrbot.core.cron.events import CronMessageEvent from astrbot.core.db import BaseDatabase from astrbot.core.db.po import CronJob from astrbot.core.platform.message_session import MessageSession -from astrbot.core.provider.entites import ProviderRequest -from astrbot.core.utils.history_saver import persist_agent_history if TYPE_CHECKING: from astrbot.core.star.context import Context @@ -284,16 +280,7 @@ async def _dispatch_to_pipeline( session_str: str, extras: dict, ) -> None: - """Woke the main agent to handle the cron job message.""" - from astrbot.core.astr_main_agent import ( - MainAgentBuildConfig, - _get_session_conv, - build_main_agent, - ) - from astrbot.core.astr_main_agent_resources import ( - PROACTIVE_AGENT_CRON_WOKE_SYSTEM_PROMPT, - ) - from astrbot.core.tools.message_tools import SendMessageToUserTool + # 将定时任务消息放入事件队列,由 PipelineScheduler 统一处理。 try: session = ( @@ -324,90 +311,15 @@ async def _dispatch_to_pipeline( if cron_payload.get("origin", "tool") == "api": cron_event.role = "admin" - # 步骤 1: 将事件放入事件队列,触发插件钩子(如 on_llm_response, on_decorating_result) - # 设置 skip_llm 标志,提示 Pipeline 跳过 LLM 调用(需要 Pipeline 支持) - cron_event.set_extra("skip_llm", True) + # 将事件放入事件队列,由 PipelineScheduler 处理 + # 不再直接调用 build_main_agent,避免双重消息 await self._event_queue.put(cron_event) logger.debug( f"Cron job {extras.get('cron_job', {}).get('id')} dispatched to pipeline (hooks triggered)." ) - - # 步骤 2: 直接调用 build_main_agent 处理 LLM 请求(注入系统提示词) - tool_call_timeout = cfg.get("provider_settings", {}).get( - "tool_call_timeout", 120 - ) - config = MainAgentBuildConfig( - tool_call_timeout=tool_call_timeout, - llm_safety_mode=False, - streaming_response=False, - ) - req = ProviderRequest() - conv = await _get_session_conv(event=cron_event, plugin_context=self.ctx) - req.conversation = conv - - # 注入历史对话上下文 - context = json.loads(conv.history) - if context: - req.contexts = context - context_dump = req._print_friendly_context() - req.contexts = [] - req.system_prompt += ( - "\n\nBellow is you and user previous conversation history:\n" - f"---\n" - f"{context_dump}\n" - f"---\n" - ) - - # 注入 cron 任务系统提示词 - cron_job_str = json.dumps(extras.get("cron_job", {}), ensure_ascii=False) - req.system_prompt += PROACTIVE_AGENT_CRON_WOKE_SYSTEM_PROMPT.format( - cron_job=cron_job_str - ) - req.prompt = ( - "You are now responding to a scheduled task. " - "Proceed according to your system instructions. " - "Output using same language as previous conversation. " - "After completing your task, summarize and output your actions and results." - ) - if not req.func_tool: - req.func_tool = ToolSet() - req.func_tool.add_tool( - self.ctx.get_llm_tool_manager().get_builtin_tool(SendMessageToUserTool) - ) - - result = await build_main_agent( - event=cron_event, plugin_context=self.ctx, config=config, req=req - ) - if not result: - logger.error("Failed to build main agent for cron job.") - return - - runner = result.agent_runner - async for _ in runner.step_until_done(30): - # agent will send message to user via using tools - pass - llm_resp = runner.get_final_llm_resp() - - # 步骤 3: 保存历史记录(含 cron 元数据) - cron_meta = extras.get("cron_job", {}) if extras else {} - summary_note = ( - f"[CronJob] {cron_meta.get('name') or cron_meta.get('id', 'unknown')}: {cron_meta.get('description', '')} " - f" triggered at {cron_meta.get('run_started_at', 'unknown time')}, " - ) - if llm_resp and llm_resp.role == "assistant": - summary_note += ( - f"I finished this job, here is the result: {llm_resp.completion_text}" - ) - - await persist_agent_history( - self.ctx.conversation_manager, - event=cron_event, - req=req, - summary_note=summary_note, - ) - if not llm_resp: - logger.warning("Cron job agent got no response") - return + # 原始的_woke_main_agent 手动调用 persist_agent_history() + # PipelineScheduler 的 internal.py 自动调用 _save_to_history() + # 功能完整保留,且更简洁 __all__ = ["CronJobManager"]