Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion astrbot/core/cron/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ def __init__(
extras: dict[str, Any] | None = None,
message_type: MessageType = MessageType.FRIEND_MESSAGE,
) -> None:
# 使用会话的平台名称而非固定的 "cron",确保插件能够正确识别平台类型
platform_meta = PlatformMetadata(
name="cron",
name=session.platform_name,
description="CronJob",
id=session.platform_id,
)
Expand Down
101 changes: 16 additions & 85 deletions astrbot/core/cron/manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
import json
from asyncio import Queue
from collections.abc import Awaitable, Callable
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any
Expand All @@ -10,13 +10,10 @@
from apscheduler.triggers.date import DateTrigger

from astrbot import logger
from astrbot.core.agent.tool import ToolSet
from astrbot.core.cron.events import CronMessageEvent
from astrbot.core.db import BaseDatabase
from astrbot.core.db.po import CronJob
from astrbot.core.platform.message_session import MessageSession
from astrbot.core.provider.entites import ProviderRequest
from astrbot.core.utils.history_saver import persist_agent_history

if TYPE_CHECKING:
from astrbot.core.star.context import Context
Expand All @@ -41,6 +38,8 @@ def __init__(self, db: BaseDatabase) -> None:
async def start(self, ctx: "Context") -> None:
self.ctx: Context = ctx # star context
async with self._lock:
# 从 Context 获取事件队列,用于将定时任务消息放入管道
self._event_queue: Queue = ctx.get_event_queue()
if self._started:
return
self.scheduler.start()
Expand Down Expand Up @@ -266,29 +265,22 @@ async def _run_active_agent_job(self, job: CronJob, start_time: datetime) -> Non
"cron_payload": payload,
}

await self._woke_main_agent(
# 将定时任务消息放入事件队列,使其经过完整的 PipelineScheduler 流程
# 这样插件的 on_llm_response 等处理器可以正常拦截和处理消息
await self._dispatch_to_pipeline(
message=note,
session_str=session_str,
extras=extras,
)

async def _woke_main_agent(
async def _dispatch_to_pipeline(
self,
*,
message: str,
session_str: str,
extras: dict,
) -> None:
"""Woke the main agent to handle the cron job message."""
from astrbot.core.astr_main_agent import (
MainAgentBuildConfig,
_get_session_conv,
build_main_agent,
)
from astrbot.core.astr_main_agent_resources import (
PROACTIVE_AGENT_CRON_WOKE_SYSTEM_PROMPT,
)
from astrbot.core.tools.message_tools import SendMessageToUserTool
# 将定时任务消息放入事件队列,由 PipelineScheduler 统一处理。

try:
session = (
Expand Down Expand Up @@ -319,76 +311,15 @@ async def _woke_main_agent(
if cron_payload.get("origin", "tool") == "api":
cron_event.role = "admin"

tool_call_timeout = cfg.get("provider_settings", {}).get(
"tool_call_timeout", 120
)
config = MainAgentBuildConfig(
tool_call_timeout=tool_call_timeout,
llm_safety_mode=False,
streaming_response=False,
)
req = ProviderRequest()
conv = await _get_session_conv(event=cron_event, plugin_context=self.ctx)
req.conversation = conv
# finetine the messages
context = json.loads(conv.history)
if context:
req.contexts = context
context_dump = req._print_friendly_context()
req.contexts = []
req.system_prompt += (
"\n\nBellow is you and user previous conversation history:\n"
f"---\n"
f"{context_dump}\n"
f"---\n"
)
cron_job_str = json.dumps(extras.get("cron_job", {}), ensure_ascii=False)
req.system_prompt += PROACTIVE_AGENT_CRON_WOKE_SYSTEM_PROMPT.format(
cron_job=cron_job_str
)
req.prompt = (
"You are now responding to a scheduled task. "
"Proceed according to your system instructions. "
"Output using same language as previous conversation. "
"After completing your task, summarize and output your actions and results."
)
if not req.func_tool:
req.func_tool = ToolSet()
req.func_tool.add_tool(
self.ctx.get_llm_tool_manager().get_builtin_tool(SendMessageToUserTool)
)

result = await build_main_agent(
event=cron_event, plugin_context=self.ctx, config=config, req=req
)
if not result:
logger.error("Failed to build main agent for cron job.")
return

runner = result.agent_runner
async for _ in runner.step_until_done(30):
# agent will send message to user via using tools
pass
llm_resp = runner.get_final_llm_resp()
cron_meta = extras.get("cron_job", {}) if extras else {}
summary_note = (
f"[CronJob] {cron_meta.get('name') or cron_meta.get('id', 'unknown')}: {cron_meta.get('description', '')} "
f" triggered at {cron_meta.get('run_started_at', 'unknown time')}, "
# 将事件放入事件队列,由 PipelineScheduler 处理
# 不再直接调用 build_main_agent,避免双重消息
await self._event_queue.put(cron_event)
logger.debug(
f"Cron job {extras.get('cron_job', {}).get('id')} dispatched to pipeline (hooks triggered)."
)
if llm_resp and llm_resp.role == "assistant":
summary_note += (
f"I finished this job, here is the result: {llm_resp.completion_text}"
)

await persist_agent_history(
self.ctx.conversation_manager,
event=cron_event,
req=req,
summary_note=summary_note,
)
if not llm_resp:
logger.warning("Cron job agent got no response")
return
# 原始的_woke_main_agent 手动调用 persist_agent_history()
# PipelineScheduler 的 internal.py 自动调用 _save_to_history()
# 功能完整保留,且更简洁


__all__ = ["CronJobManager"]
Loading