diff --git a/astrbot/core/cron/events.py b/astrbot/core/cron/events.py index a90ca38227..2b1de24b8a 100644 --- a/astrbot/core/cron/events.py +++ b/astrbot/core/cron/events.py @@ -25,8 +25,9 @@ def __init__( extras: dict[str, Any] | None = None, message_type: MessageType = MessageType.FRIEND_MESSAGE, ) -> None: + # 使用会话的平台名称而非固定的 "cron",确保插件能够正确识别平台类型 platform_meta = PlatformMetadata( - name="cron", + name=session.platform_name, description="CronJob", id=session.platform_id, ) diff --git a/astrbot/core/cron/manager.py b/astrbot/core/cron/manager.py index aa11bb601f..c38204ff99 100644 --- a/astrbot/core/cron/manager.py +++ b/astrbot/core/cron/manager.py @@ -1,5 +1,5 @@ import asyncio -import json +from asyncio import Queue from collections.abc import Awaitable, Callable from datetime import datetime, timezone from typing import TYPE_CHECKING, Any @@ -10,13 +10,10 @@ from apscheduler.triggers.date import DateTrigger from astrbot import logger -from astrbot.core.agent.tool import ToolSet from astrbot.core.cron.events import CronMessageEvent from astrbot.core.db import BaseDatabase from astrbot.core.db.po import CronJob from astrbot.core.platform.message_session import MessageSession -from astrbot.core.provider.entites import ProviderRequest -from astrbot.core.utils.history_saver import persist_agent_history if TYPE_CHECKING: from astrbot.core.star.context import Context @@ -41,6 +38,8 @@ def __init__(self, db: BaseDatabase) -> None: async def start(self, ctx: "Context") -> None: self.ctx: Context = ctx # star context async with self._lock: + # 从 Context 获取事件队列,用于将定时任务消息放入管道 + self._event_queue: Queue = ctx.get_event_queue() if self._started: return self.scheduler.start() @@ -266,29 +265,22 @@ async def _run_active_agent_job(self, job: CronJob, start_time: datetime) -> Non "cron_payload": payload, } - await self._woke_main_agent( + # 将定时任务消息放入事件队列,使其经过完整的 PipelineScheduler 流程 + # 这样插件的 on_llm_response 等处理器可以正常拦截和处理消息 + await self._dispatch_to_pipeline( message=note, session_str=session_str, extras=extras, ) - async def _woke_main_agent( + async def _dispatch_to_pipeline( self, *, message: str, session_str: str, extras: dict, ) -> None: - """Woke the main agent to handle the cron job message.""" - from astrbot.core.astr_main_agent import ( - MainAgentBuildConfig, - _get_session_conv, - build_main_agent, - ) - from astrbot.core.astr_main_agent_resources import ( - PROACTIVE_AGENT_CRON_WOKE_SYSTEM_PROMPT, - ) - from astrbot.core.tools.message_tools import SendMessageToUserTool + # 将定时任务消息放入事件队列,由 PipelineScheduler 统一处理。 try: session = ( @@ -319,76 +311,15 @@ async def _woke_main_agent( if cron_payload.get("origin", "tool") == "api": cron_event.role = "admin" - tool_call_timeout = cfg.get("provider_settings", {}).get( - "tool_call_timeout", 120 - ) - config = MainAgentBuildConfig( - tool_call_timeout=tool_call_timeout, - llm_safety_mode=False, - streaming_response=False, - ) - req = ProviderRequest() - conv = await _get_session_conv(event=cron_event, plugin_context=self.ctx) - req.conversation = conv - # finetine the messages - context = json.loads(conv.history) - if context: - req.contexts = context - context_dump = req._print_friendly_context() - req.contexts = [] - req.system_prompt += ( - "\n\nBellow is you and user previous conversation history:\n" - f"---\n" - f"{context_dump}\n" - f"---\n" - ) - cron_job_str = json.dumps(extras.get("cron_job", {}), ensure_ascii=False) - req.system_prompt += PROACTIVE_AGENT_CRON_WOKE_SYSTEM_PROMPT.format( - cron_job=cron_job_str - ) - req.prompt = ( - "You are now responding to a scheduled task. " - "Proceed according to your system instructions. " - "Output using same language as previous conversation. " - "After completing your task, summarize and output your actions and results." - ) - if not req.func_tool: - req.func_tool = ToolSet() - req.func_tool.add_tool( - self.ctx.get_llm_tool_manager().get_builtin_tool(SendMessageToUserTool) - ) - - result = await build_main_agent( - event=cron_event, plugin_context=self.ctx, config=config, req=req - ) - if not result: - logger.error("Failed to build main agent for cron job.") - return - - runner = result.agent_runner - async for _ in runner.step_until_done(30): - # agent will send message to user via using tools - pass - llm_resp = runner.get_final_llm_resp() - cron_meta = extras.get("cron_job", {}) if extras else {} - summary_note = ( - f"[CronJob] {cron_meta.get('name') or cron_meta.get('id', 'unknown')}: {cron_meta.get('description', '')} " - f" triggered at {cron_meta.get('run_started_at', 'unknown time')}, " + # 将事件放入事件队列,由 PipelineScheduler 处理 + # 不再直接调用 build_main_agent,避免双重消息 + await self._event_queue.put(cron_event) + logger.debug( + f"Cron job {extras.get('cron_job', {}).get('id')} dispatched to pipeline (hooks triggered)." ) - if llm_resp and llm_resp.role == "assistant": - summary_note += ( - f"I finished this job, here is the result: {llm_resp.completion_text}" - ) - - await persist_agent_history( - self.ctx.conversation_manager, - event=cron_event, - req=req, - summary_note=summary_note, - ) - if not llm_resp: - logger.warning("Cron job agent got no response") - return + # 原始的_woke_main_agent 手动调用 persist_agent_history() + # PipelineScheduler 的 internal.py 自动调用 _save_to_history() + # 功能完整保留,且更简洁 __all__ = ["CronJobManager"]