diff --git a/docs/ai-chat/backend.mdx b/docs/ai-chat/backend.mdx new file mode 100644 index 0000000000..5c21e88ee6 --- /dev/null +++ b/docs/ai-chat/backend.mdx @@ -0,0 +1,856 @@ +--- +title: "Backend" +sidebarTitle: "Backend" +description: "Three approaches to building your chat backend — chat.task(), session iterator, or raw task primitives." +--- + +## chat.task() + +The highest-level approach. Handles message accumulation, stop signals, turn lifecycle, and auto-piping automatically. + +### Simple: return a StreamTextResult + +Return the `streamText` result from `run` and it's automatically piped to the frontend: + +```ts +import { chat } from "@trigger.dev/sdk/ai"; +import { streamText } from "ai"; +import { openai } from "@ai-sdk/openai"; + +export const simpleChat = chat.task({ + id: "simple-chat", + run: async ({ messages, signal }) => { + return streamText({ + model: openai("gpt-4o"), + system: "You are a helpful assistant.", + messages, + abortSignal: signal, + }); + }, +}); +``` + +### Using chat.pipe() for complex flows + +For complex agent flows where `streamText` is called deep inside your code, use `chat.pipe()`. It works from **anywhere inside a task** — even nested function calls. + +```ts trigger/agent-chat.ts +import { chat } from "@trigger.dev/sdk/ai"; +import { streamText } from "ai"; +import { openai } from "@ai-sdk/openai"; +import type { ModelMessage } from "ai"; + +export const agentChat = chat.task({ + id: "agent-chat", + run: async ({ messages }) => { + // Don't return anything — chat.pipe is called inside + await runAgentLoop(messages); + }, +}); + +async function runAgentLoop(messages: ModelMessage[]) { + // ... agent logic, tool calls, etc. + + const result = streamText({ + model: openai("gpt-4o"), + messages, + }); + + // Pipe from anywhere — no need to return it + await chat.pipe(result); +} +``` + +### Lifecycle hooks + +#### onPreload + +Fires when a preloaded run starts — before any messages arrive. Use it to eagerly initialize state (DB records, user context) while the user is still typing. + +Preloaded runs are triggered by calling `transport.preload(chatId)` on the frontend. See [Preload](/ai-chat/features#preload) for details. + +```ts +export const myChat = chat.task({ + id: "my-chat", + clientDataSchema: z.object({ userId: z.string() }), + onPreload: async ({ chatId, clientData, runId, chatAccessToken }) => { + // Initialize early — before the first message arrives + const user = await db.user.findUnique({ where: { id: clientData.userId } }); + userContext.init({ name: user.name, plan: user.plan }); + + await db.chat.create({ data: { id: chatId, userId: clientData.userId } }); + await db.chatSession.upsert({ + where: { id: chatId }, + create: { id: chatId, runId, publicAccessToken: chatAccessToken }, + update: { runId, publicAccessToken: chatAccessToken }, + }); + }, + onChatStart: async ({ preloaded }) => { + if (preloaded) return; // Already initialized in onPreload + // ... non-preloaded initialization + }, + run: async ({ messages, signal }) => { + return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }); + }, +}); +``` + +| Field | Type | Description | +|-------|------|-------------| +| `chatId` | `string` | Chat session ID | +| `runId` | `string` | The Trigger.dev run ID | +| `chatAccessToken` | `string` | Scoped access token for this run | +| `clientData` | Typed by `clientDataSchema` | Custom data from the frontend | + +#### onChatStart + +Fires once on the first turn (turn 0) before `run()` executes. Use it to create a chat record in your database. + +The `continuation` field tells you whether this is a brand new chat or a continuation of an existing one (where the previous run timed out or was cancelled). The `preloaded` field tells you whether `onPreload` already ran. + +```ts +export const myChat = chat.task({ + id: "my-chat", + onChatStart: async ({ chatId, clientData, continuation, preloaded }) => { + if (preloaded) return; // Already set up in onPreload + if (continuation) return; // Chat record already exists + + const { userId } = clientData as { userId: string }; + await db.chat.create({ + data: { id: chatId, userId, title: "New chat" }, + }); + }, + run: async ({ messages, signal }) => { + return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }); + }, +}); +``` + + + `clientData` contains custom data from the frontend — either the `clientData` option on the transport constructor (sent with every message) or the `metadata` option on `sendMessage()` (per-message). See [Client data and metadata](/ai-chat/frontend#client-data-and-metadata). + + +#### onTurnStart + +Fires at the start of every turn, after message accumulation and `onChatStart` (turn 0), but **before** `run()` executes. Use it to persist messages before streaming begins — so a mid-stream page refresh still shows the user's message. + +| Field | Type | Description | +|-------|------|-------------| +| `chatId` | `string` | Chat session ID | +| `messages` | `ModelMessage[]` | Full accumulated conversation (model format) | +| `uiMessages` | `UIMessage[]` | Full accumulated conversation (UI format) | +| `turn` | `number` | Turn number (0-indexed) | +| `runId` | `string` | The Trigger.dev run ID | +| `chatAccessToken` | `string` | Scoped access token for this run | +| `continuation` | `boolean` | Whether this run is continuing an existing chat | +| `preloaded` | `boolean` | Whether this run was preloaded | +| `clientData` | Typed by `clientDataSchema` | Custom data from the frontend | + +```ts +export const myChat = chat.task({ + id: "my-chat", + onTurnStart: async ({ chatId, uiMessages, runId, chatAccessToken }) => { + await db.chat.update({ + where: { id: chatId }, + data: { messages: uiMessages }, + }); + await db.chatSession.upsert({ + where: { id: chatId }, + create: { id: chatId, runId, publicAccessToken: chatAccessToken }, + update: { runId, publicAccessToken: chatAccessToken }, + }); + }, + run: async ({ messages, signal }) => { + return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }); + }, +}); +``` + + + By persisting in `onTurnStart`, the user's message is saved to your database before the AI starts streaming. If the user refreshes mid-stream, the message is already there. + + +#### onTurnComplete + +Fires after each turn completes — after the response is captured, before waiting for the next message. This is the primary hook for persisting the assistant's response. + +| Field | Type | Description | +|-------|------|-------------| +| `chatId` | `string` | Chat session ID | +| `messages` | `ModelMessage[]` | Full accumulated conversation (model format) | +| `uiMessages` | `UIMessage[]` | Full accumulated conversation (UI format) | +| `newMessages` | `ModelMessage[]` | Only this turn's messages (model format) | +| `newUIMessages` | `UIMessage[]` | Only this turn's messages (UI format) | +| `responseMessage` | `UIMessage \| undefined` | The assistant's response for this turn | +| `turn` | `number` | Turn number (0-indexed) | +| `runId` | `string` | The Trigger.dev run ID | +| `chatAccessToken` | `string` | Scoped access token for this run | +| `lastEventId` | `string \| undefined` | Stream position for resumption. Persist this with the session. | +| `stopped` | `boolean` | Whether the user stopped generation during this turn | +| `continuation` | `boolean` | Whether this run is continuing an existing chat | +| `rawResponseMessage` | `UIMessage \| undefined` | The raw assistant response before abort cleanup (same as `responseMessage` when not stopped) | + +```ts +export const myChat = chat.task({ + id: "my-chat", + onTurnComplete: async ({ chatId, uiMessages, runId, chatAccessToken, lastEventId }) => { + await db.chat.update({ + where: { id: chatId }, + data: { messages: uiMessages }, + }); + await db.chatSession.upsert({ + where: { id: chatId }, + create: { id: chatId, runId, publicAccessToken: chatAccessToken, lastEventId }, + update: { runId, publicAccessToken: chatAccessToken, lastEventId }, + }); + }, + run: async ({ messages, signal }) => { + return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }); + }, +}); +``` + + + Use `uiMessages` to overwrite the full conversation each turn (simplest). Use `newUIMessages` if you prefer to store messages individually — for example, one database row per message. + + + + Persist `lastEventId` alongside the session. When the transport reconnects after a page refresh, it uses this to skip past already-seen events — preventing duplicate messages. + + +### Stop generation + +#### How stop works + +Calling `stop()` from `useChat` sends a stop signal to the running task via input streams. The task's `streamText` call aborts (if you passed `signal` or `stopSignal`), but the **run stays alive** and waits for the next message. The partial response is captured and accumulated normally. + +#### Abort signals + +The `run` function receives three abort signals: + +| Signal | Fires when | Use for | +|--------|-----------|---------| +| `signal` | Stop **or** cancel | Pass to `streamText` — handles both cases. **Use this in most cases.** | +| `stopSignal` | Stop only (per-turn, reset each turn) | Custom logic that should only run on user stop, not cancellation | +| `cancelSignal` | Run cancel, expire, or maxDuration exceeded | Cleanup that should only happen on full cancellation | + +```ts +export const myChat = chat.task({ + id: "my-chat", + run: async ({ messages, signal, stopSignal, cancelSignal }) => { + return streamText({ + model: openai("gpt-4o"), + messages, + abortSignal: signal, // Handles both stop and cancel + }); + }, +}); +``` + + + Use `signal` (the combined signal) in most cases. The separate `stopSignal` and `cancelSignal` are only needed if you want different behavior for stop vs cancel. + + +#### Detecting stop in callbacks + +The `onTurnComplete` event includes a `stopped` boolean that indicates whether the user stopped generation during that turn: + +```ts +export const myChat = chat.task({ + id: "my-chat", + onTurnComplete: async ({ chatId, uiMessages, stopped }) => { + await db.chat.update({ + where: { id: chatId }, + data: { messages: uiMessages, lastStoppedAt: stopped ? new Date() : undefined }, + }); + }, + run: async ({ messages, signal }) => { + return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }); + }, +}); +``` + +You can also check stop status from **anywhere** during a turn using `chat.isStopped()`. This is useful inside `streamText`'s `onFinish` callback where the AI SDK's `isAborted` flag can be unreliable (e.g. when using `createUIMessageStream` + `writer.merge()`): + +```ts +import { chat } from "@trigger.dev/sdk/ai"; +import { streamText } from "ai"; + +export const myChat = chat.task({ + id: "my-chat", + run: async ({ messages, signal }) => { + return streamText({ + model: openai("gpt-4o"), + messages, + abortSignal: signal, + onFinish: ({ isAborted }) => { + // isAborted may be false even after stop when using createUIMessageStream + const wasStopped = isAborted || chat.isStopped(); + if (wasStopped) { + // handle stop — e.g. log analytics + } + }, + }); + }, +}); +``` + +#### Cleaning up aborted messages + +When stop happens mid-stream, the captured response message can contain parts in an incomplete state — tool calls stuck in `partial-call`, reasoning blocks still marked as `streaming`, etc. These can cause UI issues like permanent spinners. + +`chat.task` automatically cleans up the `responseMessage` when stop is detected before passing it to `onTurnComplete`. If you use `chat.pipe()` manually and capture response messages yourself, use `chat.cleanupAbortedParts()`: + +```ts +const cleaned = chat.cleanupAbortedParts(rawResponseMessage); +``` + +This removes tool invocation parts stuck in `partial-call` state and marks any `streaming` text or reasoning parts as `done`. + + + Stop signal delivery is best-effort. There is a small race window where the model may finish before the stop signal arrives, in which case the turn completes normally with `stopped: false`. This is expected and does not require special handling. + + +### Persistence + +#### What needs to be persisted + +To build a chat app that survives page refreshes, you need to persist two things: + +1. **Messages** — The conversation history. Persisted **server-side** in the task via `onTurnStart` and `onTurnComplete`. +2. **Sessions** — The transport's connection state (`runId`, `publicAccessToken`, `lastEventId`). Persisted **server-side** via `onTurnStart` and `onTurnComplete`. + + + Sessions let the transport reconnect to an existing run after a page refresh. Without them, every page load would start a new run — losing the conversation context that was accumulated in the previous run. + + +#### Full persistence example + + +```ts trigger/chat.ts +import { chat } from "@trigger.dev/sdk/ai"; +import { streamText } from "ai"; +import { openai } from "@ai-sdk/openai"; +import { z } from "zod"; +import { db } from "@/lib/db"; + +export const myChat = chat.task({ + id: "my-chat", + clientDataSchema: z.object({ + userId: z.string(), + }), + onChatStart: async ({ chatId, clientData }) => { + await db.chat.create({ + data: { id: chatId, userId: clientData.userId, title: "New chat", messages: [] }, + }); + }, + onTurnStart: async ({ chatId, uiMessages, runId, chatAccessToken }) => { + // Persist messages + session before streaming + await db.chat.update({ + where: { id: chatId }, + data: { messages: uiMessages }, + }); + await db.chatSession.upsert({ + where: { id: chatId }, + create: { id: chatId, runId, publicAccessToken: chatAccessToken }, + update: { runId, publicAccessToken: chatAccessToken }, + }); + }, + onTurnComplete: async ({ chatId, uiMessages, runId, chatAccessToken, lastEventId }) => { + // Persist assistant response + stream position + await db.chat.update({ + where: { id: chatId }, + data: { messages: uiMessages }, + }); + await db.chatSession.upsert({ + where: { id: chatId }, + create: { id: chatId, runId, publicAccessToken: chatAccessToken, lastEventId }, + update: { runId, publicAccessToken: chatAccessToken, lastEventId }, + }); + }, + run: async ({ messages, signal }) => { + return streamText({ + model: openai("gpt-4o"), + messages, + abortSignal: signal, + }); + }, +}); +``` + +```ts app/actions.ts +"use server"; + +import { chat } from "@trigger.dev/sdk/ai"; +import type { myChat } from "@/trigger/chat"; +import { db } from "@/lib/db"; + +export const getChatToken = () => + chat.createAccessToken("my-chat"); + +export async function getChatMessages(chatId: string) { + const found = await db.chat.findUnique({ where: { id: chatId } }); + return found?.messages ?? []; +} + +export async function getAllSessions() { + const sessions = await db.chatSession.findMany(); + const result: Record = {}; + for (const s of sessions) { + result[s.id] = { + runId: s.runId, + publicAccessToken: s.publicAccessToken, + lastEventId: s.lastEventId ?? undefined, + }; + } + return result; +} + +export async function deleteSession(chatId: string) { + await db.chatSession.delete({ where: { id: chatId } }).catch(() => {}); +} +``` + +```tsx app/components/chat.tsx +"use client"; + +import { useChat } from "@ai-sdk/react"; +import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react"; +import type { myChat } from "@/trigger/chat"; +import { getChatToken, deleteSession } from "@/app/actions"; + +export function Chat({ chatId, initialMessages, initialSessions }) { + const transport = useTriggerChatTransport({ + task: "my-chat", + accessToken: getChatToken, + clientData: { userId: currentUser.id }, // Type-checked against clientDataSchema + sessions: initialSessions, + onSessionChange: (id, session) => { + if (!session) deleteSession(id); + }, + }); + + const { messages, sendMessage, stop, status } = useChat({ + id: chatId, + messages: initialMessages, + transport, + resume: initialMessages.length > 0, + }); + + return ( +
+ {messages.map((m) => ( +
+ {m.role}: + {m.parts.map((part, i) => + part.type === "text" ? {part.text} : null + )} +
+ ))} + +
{ + e.preventDefault(); + const input = e.currentTarget.querySelector("input"); + if (input?.value) { + sendMessage({ text: input.value }); + input.value = ""; + } + }} + > + + + {status === "streaming" && ( + + )} +
+
+ ); +} +``` +
+ +### Runtime configuration + +#### chat.setTurnTimeout() + +Override how long the run stays suspended waiting for the next message. Call from inside `run()`: + +```ts +run: async ({ messages, signal }) => { + chat.setTurnTimeout("2h"); // Wait longer for this conversation + return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }); +}, +``` + +#### chat.setWarmTimeoutInSeconds() + +Override how long the run stays warm (active, using compute) after each turn: + +```ts +run: async ({ messages, signal }) => { + chat.setWarmTimeoutInSeconds(60); // Stay warm for 1 minute + return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }); +}, +``` + + + Longer warm timeout means faster responses but more compute usage. Set to `0` to suspend immediately after each turn (minimum latency cost, slight delay on next message). + + +#### Stream options + +Control how `streamText` results are converted to the frontend stream via `toUIMessageStream()`. Set static defaults on the task, or override per-turn. + +##### Error handling with onError + +When `streamText` encounters an error mid-stream (rate limits, API failures, network errors), the `onError` callback converts it to a string that's sent to the frontend as an `{ type: "error", errorText }` chunk. The AI SDK's `useChat` receives this via its `onError` callback. + +By default, the raw error message is sent to the frontend. Use `onError` to sanitize errors and avoid leaking internal details: + +```ts +export const myChat = chat.task({ + id: "my-chat", + uiMessageStreamOptions: { + onError: (error) => { + // Log the full error server-side for debugging + console.error("Stream error:", error); + // Return a sanitized message — this is what the frontend sees + if (error instanceof Error && error.message.includes("rate limit")) { + return "Rate limited — please wait a moment and try again."; + } + return "Something went wrong. Please try again."; + }, + }, + run: async ({ messages, signal }) => { + return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }); + }, +}); +``` + +`onError` is also called for tool execution errors, so a single handler covers both LLM errors and tool failures. + +On the frontend, handle the error in `useChat`: + +```tsx +const { messages, sendMessage } = useChat({ + transport, + onError: (error) => { + // error.message contains the string returned by your onError handler + toast.error(error.message); + }, +}); +``` + +##### Reasoning and sources + +Control which AI SDK features are forwarded to the frontend: + +```ts +export const myChat = chat.task({ + id: "my-chat", + uiMessageStreamOptions: { + sendReasoning: true, // Forward model reasoning (default: true) + sendSources: true, // Forward source citations (default: false) + }, + run: async ({ messages, signal }) => { + return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }); + }, +}); +``` + +##### Per-turn overrides + +Override per-turn with `chat.setUIMessageStreamOptions()` — per-turn values merge with the static config (per-turn wins on conflicts). The override is cleared automatically after each turn. + +```ts +run: async ({ messages, clientData, signal }) => { + // Enable reasoning only for certain models + if (clientData.model?.includes("claude")) { + chat.setUIMessageStreamOptions({ sendReasoning: true }); + } + return streamText({ model: openai(clientData.model ?? "gpt-4o"), messages, abortSignal: signal }); +}, +``` + +`chat.setUIMessageStreamOptions()` works across all abstraction levels — `chat.task()`, `chat.createSession()` / `turn.complete()`, and `chat.pipeAndCapture()`. + +See [ChatUIMessageStreamOptions](/ai-chat/reference#chatuimessagestreamoptions) for the full reference. + + + `onFinish` is managed internally for response capture and cannot be overridden here. Use `streamText`'s `onFinish` callback for custom finish handling, or use [raw task mode](#raw-task-with-primitives) for full control over `toUIMessageStream()`. + + +### Manual mode with task() + +If you need full control over task options, use the standard `task()` with `ChatTaskPayload` and `chat.pipe()`: + +```ts +import { task } from "@trigger.dev/sdk"; +import { chat, type ChatTaskPayload } from "@trigger.dev/sdk/ai"; +import { streamText } from "ai"; +import { openai } from "@ai-sdk/openai"; + +export const manualChat = task({ + id: "manual-chat", + retry: { maxAttempts: 3 }, + queue: { concurrencyLimit: 10 }, + run: async (payload: ChatTaskPayload) => { + const result = streamText({ + model: openai("gpt-4o"), + messages: payload.messages, + }); + + await chat.pipe(result); + }, +}); +``` + + + Manual mode does not get automatic message accumulation or the `onTurnComplete`/`onChatStart` lifecycle hooks. The `responseMessage` field in `onTurnComplete` will be `undefined` when using `chat.pipe()` directly. Use `chat.task()` for the full multi-turn experience. + + +--- + +## chat.createSession() + +A middle ground between `chat.task()` and raw primitives. You get an async iterator that yields `ChatTurn` objects — each turn handles stop signals, message accumulation, and turn-complete signaling automatically. You control initialization, model/tool selection, persistence, and any custom per-turn logic. + +Use `chat.createSession()` inside a standard `task()`: + +```ts +import { task } from "@trigger.dev/sdk"; +import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai"; +import { streamText } from "ai"; +import { openai } from "@ai-sdk/openai"; + +export const myChat = task({ + id: "my-chat", + run: async (payload: ChatTaskWirePayload, { signal }) => { + // One-time initialization — just code, no hooks + const clientData = payload.metadata as { userId: string }; + await db.chat.create({ data: { id: payload.chatId, userId: clientData.userId } }); + + const session = chat.createSession(payload, { + signal, + warmTimeoutInSeconds: 60, + timeout: "1h", + }); + + for await (const turn of session) { + const result = streamText({ + model: openai("gpt-4o"), + messages: turn.messages, + abortSignal: turn.signal, + }); + + // Pipe, capture, accumulate, and signal turn-complete — all in one call + await turn.complete(result); + + // Persist after each turn + await db.chat.update({ + where: { id: turn.chatId }, + data: { messages: turn.uiMessages }, + }); + } + }, +}); +``` + +### ChatSessionOptions + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `signal` | `AbortSignal` | required | Run-level cancel signal (from task context) | +| `warmTimeoutInSeconds` | `number` | `30` | Seconds to stay warm between turns | +| `timeout` | `string` | `"1h"` | Duration string for suspend timeout | +| `maxTurns` | `number` | `100` | Max turns before ending | + +### ChatTurn + +Each turn yielded by the iterator provides: + +| Field | Type | Description | +|-------|------|-------------| +| `number` | `number` | Turn number (0-indexed) | +| `chatId` | `string` | Chat session ID | +| `trigger` | `string` | What triggered this turn | +| `clientData` | `unknown` | Client data from the transport | +| `messages` | `ModelMessage[]` | Full accumulated model messages — pass to `streamText` | +| `uiMessages` | `UIMessage[]` | Full accumulated UI messages — use for persistence | +| `signal` | `AbortSignal` | Combined stop+cancel signal (fresh each turn) | +| `stopped` | `boolean` | Whether the user stopped generation this turn | +| `continuation` | `boolean` | Whether this is a continuation run | + +| Method | Description | +|--------|-------------| +| `turn.complete(source)` | Pipe stream, capture response, accumulate, and signal turn-complete | +| `turn.done()` | Just signal turn-complete (when you've piped manually) | +| `turn.addResponse(response)` | Add a response to the accumulator manually | + +### turn.complete() vs manual control + +`turn.complete(result)` is the easy path — it handles piping, capturing the response, accumulating messages, cleaning up aborted parts, and writing the turn-complete chunk. + +For more control, you can do each step manually: + +```ts +for await (const turn of session) { + const result = streamText({ + model: openai("gpt-4o"), + messages: turn.messages, + abortSignal: turn.signal, + }); + + // Manual: pipe and capture separately + const response = await chat.pipeAndCapture(result, { signal: turn.signal }); + + if (response) { + // Custom processing before accumulating + await turn.addResponse(response); + } + + // Custom persistence, analytics, etc. + await db.chat.update({ ... }); + + // Must call done() when not using complete() + await turn.done(); +} +``` + +--- + +## Raw task with primitives + +For full control, use a standard `task()` with the composable primitives from the `chat` namespace. You manage everything: the turn loop, stop signals, message accumulation, and turn-complete signaling. + +Raw task mode also lets you call `.toUIMessageStream()` yourself with any options — including `onFinish` and `originalMessages`. This is the right choice when you need complete control over the stream conversion beyond what `chat.setUIMessageStreamOptions()` provides. + +### Primitives + +| Primitive | Description | +|-----------|-------------| +| `chat.messages` | Input stream for incoming messages — use `.waitWithWarmup()` to wait for the next turn | +| `chat.createStopSignal()` | Create a managed stop signal wired to the stop input stream | +| `chat.pipeAndCapture(result)` | Pipe a `StreamTextResult` to the chat stream and capture the response | +| `chat.writeTurnComplete()` | Signal the frontend that the current turn is complete | +| `chat.MessageAccumulator` | Accumulates conversation messages across turns | +| `chat.pipe(stream)` | Pipe a stream to the frontend (no response capture) | +| `chat.cleanupAbortedParts(msg)` | Clean up incomplete parts from a stopped response | + +### Example + +```ts +import { task } from "@trigger.dev/sdk"; +import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai"; +import { streamText } from "ai"; +import { openai } from "@ai-sdk/openai"; + +export const myChat = task({ + id: "my-chat-raw", + run: async (payload: ChatTaskWirePayload, { signal: runSignal }) => { + let currentPayload = payload; + + // Handle preload — wait for the first real message + if (currentPayload.trigger === "preload") { + const result = await chat.messages.waitWithWarmup({ + warmTimeoutInSeconds: 60, + timeout: "1h", + spanName: "waiting for first message", + }); + if (!result.ok) return; + currentPayload = result.output; + } + + const stop = chat.createStopSignal(); + const conversation = new chat.MessageAccumulator(); + + for (let turn = 0; turn < 100; turn++) { + stop.reset(); + + const messages = await conversation.addIncoming( + currentPayload.messages, + currentPayload.trigger, + turn + ); + + const combinedSignal = AbortSignal.any([runSignal, stop.signal]); + + const result = streamText({ + model: openai("gpt-4o"), + messages, + abortSignal: combinedSignal, + }); + + let response; + try { + response = await chat.pipeAndCapture(result, { signal: combinedSignal }); + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + if (runSignal.aborted) break; + // Stop — fall through to accumulate partial + } else { + throw error; + } + } + + if (response) { + const cleaned = stop.signal.aborted && !runSignal.aborted + ? chat.cleanupAbortedParts(response) + : response; + await conversation.addResponse(cleaned); + } + + if (runSignal.aborted) break; + + // Persist, analytics, etc. + await db.chat.update({ + where: { id: currentPayload.chatId }, + data: { messages: conversation.uiMessages }, + }); + + await chat.writeTurnComplete(); + + // Wait for the next message + const next = await chat.messages.waitWithWarmup({ + warmTimeoutInSeconds: 60, + timeout: "1h", + spanName: "waiting for next message", + }); + if (!next.ok) break; + currentPayload = next.output; + } + + stop.cleanup(); + }, +}); +``` + +### MessageAccumulator + +The `MessageAccumulator` handles the transport protocol automatically: + +- Turn 0: replaces messages (full history from frontend) +- Subsequent turns: appends new messages (frontend only sends the new user message) +- Regenerate: replaces messages (full history minus last assistant message) + +```ts +const conversation = new chat.MessageAccumulator(); + +// Returns full accumulated ModelMessage[] for streamText +const messages = await conversation.addIncoming(payload.messages, payload.trigger, turn); + +// After piping, add the response +const response = await chat.pipeAndCapture(result); +if (response) await conversation.addResponse(response); + +// Access accumulated messages for persistence +conversation.uiMessages; // UIMessage[] +conversation.modelMessages; // ModelMessage[] +``` diff --git a/docs/ai-chat/features.mdx b/docs/ai-chat/features.mdx new file mode 100644 index 0000000000..fd4b63789a --- /dev/null +++ b/docs/ai-chat/features.mdx @@ -0,0 +1,421 @@ +--- +title: "Features" +sidebarTitle: "Features" +description: "Per-run data, deferred work, custom streaming, subtask integration, and preload." +--- + +## Per-run data with chat.local + +Use `chat.local` to create typed, run-scoped data that persists across turns and is accessible from anywhere — the run function, tools, nested helpers. Each run gets its own isolated copy, and locals are automatically cleared between runs. + +When a subtask is invoked via `ai.tool()`, initialized locals are automatically serialized into the subtask's metadata and hydrated on first access — no extra code needed. Subtask changes to hydrated locals are local to the subtask and don't propagate back to the parent. + +### Declaring and initializing + +Declare locals at module level with a unique `id`, then initialize them inside a lifecycle hook where you have context (chatId, clientData, etc.): + +```ts +import { chat } from "@trigger.dev/sdk/ai"; +import { streamText, tool } from "ai"; +import { openai } from "@ai-sdk/openai"; +import { z } from "zod"; +import { db } from "@/lib/db"; + +// Declare at module level — each local needs a unique id +const userContext = chat.local<{ + name: string; + plan: "free" | "pro"; + messageCount: number; +}>({ id: "userContext" }); + +export const myChat = chat.task({ + id: "my-chat", + clientDataSchema: z.object({ userId: z.string() }), + onChatStart: async ({ clientData }) => { + // Initialize with real data from your database + const user = await db.user.findUnique({ + where: { id: clientData.userId }, + }); + userContext.init({ + name: user.name, + plan: user.plan, + messageCount: user.messageCount, + }); + }, + run: async ({ messages, signal }) => { + userContext.messageCount++; + + return streamText({ + model: openai("gpt-4o"), + system: `Helping ${userContext.name} (${userContext.plan} plan).`, + messages, + abortSignal: signal, + }); + }, +}); +``` + +### Accessing from tools + +Locals are accessible from anywhere during task execution — including AI SDK tools: + +```ts +const userContext = chat.local<{ plan: "free" | "pro" }>({ id: "userContext" }); + +const premiumTool = tool({ + description: "Access premium features", + inputSchema: z.object({ feature: z.string() }), + execute: async ({ feature }) => { + if (userContext.plan !== "pro") { + return { error: "This feature requires a Pro plan." }; + } + // ... premium logic + }, +}); +``` + +### Accessing from subtasks + +When you use `ai.tool()` to expose a subtask, chat locals are automatically available read-only: + +```ts +import { chat, ai } from "@trigger.dev/sdk/ai"; +import { schemaTask } from "@trigger.dev/sdk"; +import { streamText } from "ai"; +import { openai } from "@ai-sdk/openai"; +import { z } from "zod"; + +const userContext = chat.local<{ name: string; plan: "free" | "pro" }>({ id: "userContext" }); + +export const analyzeData = schemaTask({ + id: "analyze-data", + schema: z.object({ query: z.string() }), + run: async ({ query }) => { + // userContext.name just works — auto-hydrated from parent metadata + console.log(`Analyzing for ${userContext.name}`); + // Changes here are local to this subtask and don't propagate back + }, +}); + +export const myChat = chat.task({ + id: "my-chat", + onChatStart: async ({ clientData }) => { + userContext.init({ name: "Alice", plan: "pro" }); + }, + run: async ({ messages, signal }) => { + return streamText({ + model: openai("gpt-4o"), + messages, + tools: { analyzeData: ai.tool(analyzeData) }, + abortSignal: signal, + }); + }, +}); +``` + + + Values must be JSON-serializable for subtask access. Non-serializable values (functions, class instances, etc.) will be lost during transfer. + + +### Dirty tracking and persistence + +The `hasChanged()` method returns `true` if any property was set since the last check, then resets the flag. Use it in lifecycle hooks to only persist when data actually changed: + +```ts +onTurnComplete: async ({ chatId }) => { + if (userContext.hasChanged()) { + await db.user.update({ + where: { id: userContext.get().userId }, + data: { + messageCount: userContext.messageCount, + }, + }); + } +}, +``` + +### chat.local API + +| Method | Description | +|--------|-------------| +| `chat.local({ id })` | Create a typed local with a unique id (declare at module level) | +| `local.init(value)` | Initialize with a value (call in hooks or `run`) | +| `local.hasChanged()` | Returns `true` if modified since last check, resets flag | +| `local.get()` | Returns a plain object copy (for serialization) | +| `local.property` | Direct property access (read/write via Proxy) | + + + Locals use shallow proxying. Nested object mutations like `local.prefs.theme = "dark"` won't trigger the dirty flag. Instead, replace the whole property: `local.prefs = { ...local.prefs, theme: "dark" }`. + + +--- + +## chat.defer() + +Use `chat.defer()` to run background work in parallel with streaming. The deferred promise runs alongside the LLM response and is awaited (with a 5s timeout) before `onTurnComplete` fires. + +This moves non-blocking work (DB writes, analytics, etc.) out of the critical path: + +```ts +export const myChat = chat.task({ + id: "my-chat", + onTurnStart: async ({ chatId, uiMessages }) => { + // Persist messages without blocking the LLM call + chat.defer(db.chat.update({ where: { id: chatId }, data: { messages: uiMessages } })); + }, + run: async ({ messages, signal }) => { + return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }); + }, +}); +``` + +`chat.defer()` can be called from anywhere during a turn — hooks, `run()`, or nested helpers. All deferred promises are collected and awaited together before `onTurnComplete`. + +--- + +## Custom streaming with chat.stream + +`chat.stream` is a typed stream bound to the chat output. Use it to write custom `UIMessageChunk` data alongside the AI-generated response — for example, status updates or progress indicators. + +```ts +import { chat } from "@trigger.dev/sdk/ai"; + +export const myChat = chat.task({ + id: "my-chat", + run: async ({ messages, signal }) => { + // Write a custom data part to the chat stream. + // The AI SDK's data-* chunk protocol adds this to message.parts + // on the frontend, where you can render it however you like. + const { waitUntilComplete } = chat.stream.writer({ + execute: ({ write }) => { + write({ + type: "data-status", + id: "search-progress", + data: { message: "Searching the web...", progress: 0.5 }, + }); + }, + }); + await waitUntilComplete(); + + // Then stream the AI response + return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }); + }, +}); +``` + + + Use `data-*` chunk types (e.g. `data-status`, `data-progress`) for custom data. The AI SDK processes these into `DataUIPart` objects in `message.parts` on the frontend. Writing the same `type` + `id` again updates the existing part instead of creating a new one — useful for live progress. + + +`chat.stream` exposes the full stream API: + +| Method | Description | +|--------|-------------| +| `chat.stream.writer(options)` | Write individual chunks via a callback | +| `chat.stream.pipe(stream, options?)` | Pipe a `ReadableStream` or `AsyncIterable` | +| `chat.stream.append(value, options?)` | Append raw data | +| `chat.stream.read(runId, options?)` | Read the stream by run ID | + +### Streaming from subtasks + +When a tool invokes a subtask via `triggerAndWait`, the subtask can stream directly to the parent chat using `target: "root"`: + +```ts +import { chat, ai } from "@trigger.dev/sdk/ai"; +import { schemaTask } from "@trigger.dev/sdk"; +import { streamText, generateId } from "ai"; +import { z } from "zod"; + +// A subtask that streams progress back to the parent chat +export const researchTask = schemaTask({ + id: "research", + schema: z.object({ query: z.string() }), + run: async ({ query }) => { + const partId = generateId(); + + // Write a data-* chunk to the root run's chat stream. + // The frontend receives this as a DataUIPart in message.parts. + const { waitUntilComplete } = chat.stream.writer({ + target: "root", + execute: ({ write }) => { + write({ + type: "data-research-status", + id: partId, + data: { query, status: "in-progress" }, + }); + }, + }); + await waitUntilComplete(); + + // Do the work... + const result = await doResearch(query); + + // Update the same part with the final status + const { waitUntilComplete: waitDone } = chat.stream.writer({ + target: "root", + execute: ({ write }) => { + write({ + type: "data-research-status", + id: partId, + data: { query, status: "done", resultCount: result.length }, + }); + }, + }); + await waitDone(); + + return result; + }, +}); + +// The chat task uses it as a tool via ai.tool() +export const myChat = chat.task({ + id: "my-chat", + run: async ({ messages, signal }) => { + return streamText({ + model: openai("gpt-4o"), + messages, + abortSignal: signal, + tools: { + research: ai.tool(researchTask), + }, + }); + }, +}); +``` + +On the frontend, render the custom data part: + +```tsx +{message.parts.map((part, i) => { + if (part.type === "data-research-status") { + const { query, status, resultCount } = part.data; + return ( +
+ {status === "done" ? `Found ${resultCount} results` : `Researching "${query}"...`} +
+ ); + } + // ...other part types +})} +``` + +The `target` option accepts: +- `"self"` — current run (default) +- `"parent"` — parent task's run +- `"root"` — root task's run (the chat task) +- A specific run ID string + +--- + +## ai.tool() — subtask integration + +When a subtask runs via `ai.tool()`, it can access the tool call context and chat context from the parent: + +```ts +import { ai, chat } from "@trigger.dev/sdk/ai"; +import type { myChat } from "./chat"; + +export const mySubtask = schemaTask({ + id: "my-subtask", + schema: z.object({ query: z.string() }), + run: async ({ query }) => { + // Get the AI SDK's tool call ID (useful for data-* chunk IDs) + const toolCallId = ai.toolCallId(); + + // Get typed chat context — pass typeof yourChatTask for typed clientData + const { chatId, clientData } = ai.chatContextOrThrow(); + // clientData is typed based on myChat's clientDataSchema + + // Write a data chunk using the tool call ID + const { waitUntilComplete } = chat.stream.writer({ + target: "root", + execute: ({ write }) => { + write({ + type: "data-progress", + id: toolCallId, + data: { status: "working", query, userId: clientData?.userId }, + }); + }, + }); + await waitUntilComplete(); + + return { result: "done" }; + }, +}); +``` + +| Helper | Returns | Description | +|--------|---------|-------------| +| `ai.toolCallId()` | `string \| undefined` | The AI SDK tool call ID | +| `ai.chatContext()` | `{ chatId, turn, continuation, clientData } \| undefined` | Chat context with typed `clientData`. Returns `undefined` if not in a chat context. | +| `ai.chatContextOrThrow()` | `{ chatId, turn, continuation, clientData }` | Same as above but throws if not in a chat context | +| `ai.currentToolOptions()` | `ToolCallExecutionOptions \| undefined` | Full tool execution options | + +--- + +## Preload + +Preload eagerly triggers a run for a chat before the first message is sent. This allows initialization (DB setup, context loading) to happen while the user is still typing, reducing first-response latency. + +### Frontend + +Call `transport.preload(chatId)` to start a run early: + +```tsx +import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react"; +import { useChat } from "@ai-sdk/react"; + +export function Chat({ chatId }) { + const transport = useTriggerChatTransport({ + task: "my-chat", + accessToken: getChatToken, + clientData: { userId: currentUser.id }, + }); + + // Preload on mount — run starts before the user types anything + useEffect(() => { + transport.preload(chatId, { warmTimeoutInSeconds: 60 }); + }, [chatId]); + + const { messages, sendMessage } = useChat({ id: chatId, transport }); + // ... +} +``` + +Preload is a no-op if a session already exists for this chatId. + +### Backend + +On the backend, the `onPreload` hook fires immediately. The run then waits for the first message. When the user sends a message, `onChatStart` fires with `preloaded: true` — you can skip initialization that was already done in `onPreload`: + +```ts +export const myChat = chat.task({ + id: "my-chat", + onPreload: async ({ chatId, clientData }) => { + // Eagerly initialize — runs before the first message + userContext.init(await loadUser(clientData.userId)); + await db.chat.create({ data: { id: chatId } }); + }, + onChatStart: async ({ preloaded }) => { + if (preloaded) return; // Already initialized in onPreload + // ... fallback initialization for non-preloaded runs + }, + run: async ({ messages, signal }) => { + return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal }); + }, +}); +``` + +With `chat.createSession()` or raw tasks, check `payload.trigger === "preload"` and wait for the first message: + +```ts +if (payload.trigger === "preload") { + // Initialize early... + const result = await chat.messages.waitWithWarmup({ + warmTimeoutInSeconds: 60, + timeout: "1h", + }); + if (!result.ok) return; + currentPayload = result.output; +} +``` diff --git a/docs/ai-chat/frontend.mdx b/docs/ai-chat/frontend.mdx new file mode 100644 index 0000000000..0e7854e4d5 --- /dev/null +++ b/docs/ai-chat/frontend.mdx @@ -0,0 +1,234 @@ +--- +title: "Frontend" +sidebarTitle: "Frontend" +description: "Transport setup, session management, client data, and frontend patterns for AI Chat." +--- + +## Transport setup + +Use the `useTriggerChatTransport` hook from `@trigger.dev/sdk/chat/react` to create a memoized transport instance, then pass it to `useChat`: + +```tsx +import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react"; +import { useChat } from "@ai-sdk/react"; +import type { myChat } from "@/trigger/chat"; +import { getChatToken } from "@/app/actions"; + +export function Chat() { + const transport = useTriggerChatTransport({ + task: "my-chat", + accessToken: getChatToken, + }); + + const { messages, sendMessage, stop, status } = useChat({ transport }); + // ... render UI +} +``` + +The transport is created once on first render and reused across re-renders. Pass a type parameter for compile-time validation of the task ID. + + + The hook keeps `onSessionChange` up to date via a ref internally, so you don't need to memoize the callback or worry about stale closures. + + +### Dynamic access tokens + +For token refresh, pass a function instead of a string. It's called on each `sendMessage`: + +```ts +const transport = useTriggerChatTransport({ + task: "my-chat", + accessToken: async () => { + const res = await fetch("/api/chat-token"); + return res.text(); + }, +}); +``` + +## Session management + +### Session cleanup (frontend) + +Since session creation and updates are handled server-side, the frontend only needs to handle session deletion when a run ends: + +```tsx +const transport = useTriggerChatTransport({ + task: "my-chat", + accessToken: getChatToken, + sessions: loadedSessions, // Restored from DB on page load + onSessionChange: (chatId, session) => { + if (!session) { + deleteSession(chatId); // Server action — run ended + } + }, +}); +``` + +### Restoring on page load + +On page load, fetch both the messages and the session from your database, then pass them to `useChat` and the transport. Pass `resume: true` to `useChat` when there's an existing conversation — this tells the AI SDK to reconnect to the stream via the transport. + +```tsx app/page.tsx +"use client"; + +import { useEffect, useState } from "react"; +import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react"; +import { useChat } from "@ai-sdk/react"; +import { getChatToken, getChatMessages, getSession, deleteSession } from "@/app/actions"; + +export default function ChatPage({ chatId }: { chatId: string }) { + const [initialMessages, setInitialMessages] = useState([]); + const [initialSession, setInitialSession] = useState(undefined); + const [loaded, setLoaded] = useState(false); + + useEffect(() => { + async function load() { + const [messages, session] = await Promise.all([ + getChatMessages(chatId), + getSession(chatId), + ]); + setInitialMessages(messages); + setInitialSession(session ? { [chatId]: session } : undefined); + setLoaded(true); + } + load(); + }, [chatId]); + + if (!loaded) return null; + + return ( + + ); +} + +function ChatClient({ chatId, initialMessages, initialSessions }) { + const transport = useTriggerChatTransport({ + task: "my-chat", + accessToken: getChatToken, + sessions: initialSessions, + onSessionChange: (id, session) => { + if (!session) deleteSession(id); + }, + }); + + const { messages, sendMessage, stop, status } = useChat({ + id: chatId, + messages: initialMessages, + transport, + resume: initialMessages.length > 0, // Resume if there's an existing conversation + }); + + // ... render UI +} +``` + + + `resume: true` causes `useChat` to call `reconnectToStream` on the transport when the component mounts. The transport uses the session's `lastEventId` to skip past already-seen stream events, so the frontend only receives new data. Only enable `resume` when there are existing messages — for brand new chats, there's nothing to reconnect to. + + + + In React strict mode (enabled by default in Next.js dev), you may see a `TypeError: Cannot read properties of undefined (reading 'state')` in the console when using `resume`. This is a [known bug in the AI SDK](https://github.com/vercel/ai/issues/8477) caused by React strict mode double-firing the resume effect. The error is caught internally and **does not affect functionality** — streaming and message display work correctly. It only appears in development and will not occur in production builds. + + +## Client data and metadata + +### Transport-level client data + +Set default client data on the transport that's included in every request. When the task uses `clientDataSchema`, this is type-checked to match: + +```ts +const transport = useTriggerChatTransport({ + task: "my-chat", + accessToken: getChatToken, + clientData: { userId: currentUser.id }, +}); +``` + +### Per-message metadata + +Pass metadata with individual messages via `sendMessage`. Per-message values are merged with transport-level client data (per-message wins on conflicts): + +```ts +sendMessage( + { text: "Hello" }, + { metadata: { model: "gpt-4o", priority: "high" } } +); +``` + +### Typed client data with clientDataSchema + +Instead of manually parsing `clientData` with Zod in every hook, pass a `clientDataSchema` to `chat.task`. The schema validates the data once per turn, and `clientData` is typed in all hooks and `run`: + +```ts +import { chat } from "@trigger.dev/sdk/ai"; +import { streamText } from "ai"; +import { openai } from "@ai-sdk/openai"; +import { z } from "zod"; + +export const myChat = chat.task({ + id: "my-chat", + clientDataSchema: z.object({ + model: z.string().optional(), + userId: z.string(), + }), + onChatStart: async ({ chatId, clientData }) => { + // clientData is typed as { model?: string; userId: string } + await db.chat.create({ + data: { id: chatId, userId: clientData.userId }, + }); + }, + run: async ({ messages, clientData, signal }) => { + // Same typed clientData — no manual parsing needed + return streamText({ + model: openai(clientData?.model ?? "gpt-4o"), + messages, + abortSignal: signal, + }); + }, +}); +``` + +The schema also types the `clientData` option on the frontend transport: + +```ts +// TypeScript enforces that clientData matches the schema +const transport = useTriggerChatTransport({ + task: "my-chat", + accessToken: getChatToken, + clientData: { userId: currentUser.id }, +}); +``` + +Supports Zod, ArkType, Valibot, and other schema libraries supported by the SDK. + +## Stop generation + +Calling `stop()` from `useChat` sends a stop signal to the running task via input streams. The task aborts the current `streamText` call, but the run stays alive for the next message: + +```tsx +const { messages, sendMessage, stop, status } = useChat({ transport }); + +{status === "streaming" && ( + +)} +``` + +See [Stop generation](/ai-chat/backend#stop-generation) in the backend docs for how to handle stop signals in your task. + +## Self-hosting + +If you're self-hosting Trigger.dev, pass the `baseURL` option: + +```ts +const transport = useTriggerChatTransport({ + task: "my-chat", + accessToken, + baseURL: "https://your-trigger-instance.com", +}); +``` diff --git a/docs/ai-chat/overview.mdx b/docs/ai-chat/overview.mdx new file mode 100644 index 0000000000..eb3d1ab23d --- /dev/null +++ b/docs/ai-chat/overview.mdx @@ -0,0 +1,161 @@ +--- +title: "AI Chat" +sidebarTitle: "Overview" +description: "Run AI SDK chat completions as durable Trigger.dev tasks with built-in realtime streaming, multi-turn conversations, and message persistence." +--- + +## Overview + +The `@trigger.dev/sdk` provides a custom [ChatTransport](https://sdk.vercel.ai/docs/ai-sdk-ui/transport) for the Vercel AI SDK's `useChat` hook. This lets you run chat completions as **durable Trigger.dev tasks** instead of fragile API routes — with automatic retries, observability, and realtime streaming built in. + +**How it works:** +1. The frontend sends messages via `useChat` through `TriggerChatTransport` +2. The first message triggers a Trigger.dev task; subsequent messages resume the **same run** via input streams +3. The task streams `UIMessageChunk` events back via Trigger.dev's realtime streams +4. The AI SDK's `useChat` processes the stream natively — text, tool calls, reasoning, etc. +5. Between turns, the run stays warm briefly then suspends (freeing compute) until the next message + +No custom API routes needed. Your chat backend is a Trigger.dev task. + + + +### First message flow + +```mermaid +sequenceDiagram + participant User + participant useChat as useChat + Transport + participant API as Trigger.dev API + participant Task as chat.task Worker + participant LLM as LLM Provider + + User->>useChat: sendMessage("Hello") + useChat->>useChat: No session for chatId → trigger new run + useChat->>API: triggerTask(payload, tags: [chat:id]) + API-->>useChat: { runId, publicAccessToken } + useChat->>useChat: Store session, subscribe to SSE + + API->>Task: Start run with ChatTaskWirePayload + Task->>Task: onChatStart({ chatId, messages, clientData }) + Task->>Task: onTurnStart({ chatId, messages }) + Task->>LLM: streamText({ model, messages, abortSignal }) + LLM-->>Task: Stream response chunks + Task->>API: streams.pipe("chat", uiStream) + API-->>useChat: SSE: UIMessageChunks + useChat-->>User: Render streaming text + Task->>API: Write __trigger_turn_complete + API-->>useChat: SSE: turn complete + refreshed token + useChat->>useChat: Close stream, update session + Task->>Task: onTurnComplete({ messages, stopped: false }) + Task->>Task: Wait for next message (warm → suspend) +``` + +### Multi-turn flow + +```mermaid +sequenceDiagram + participant User + participant useChat as useChat + Transport + participant API as Trigger.dev API + participant Task as chat.task Worker + participant LLM as LLM Provider + + Note over Task: Suspended, waiting for message + + User->>useChat: sendMessage("Tell me more") + useChat->>useChat: Session exists → send via input stream + useChat->>API: sendInputStream(runId, "chat-messages", payload) + Note right of useChat: Only sends new message (not full history) + + API->>Task: Deliver to messagesInput + Task->>Task: Wake from suspend + Task->>Task: Append to accumulated messages + Task->>Task: onTurnStart({ turn: 1 }) + Task->>LLM: streamText({ messages: [all accumulated] }) + LLM-->>Task: Stream response + Task->>API: streams.pipe("chat", uiStream) + API-->>useChat: SSE: UIMessageChunks + useChat-->>User: Render streaming text + Task->>API: Write __trigger_turn_complete + Task->>Task: onTurnComplete({ turn: 1 }) + Task->>Task: Wait for next message (warm → suspend) +``` + +### Stop signal flow + +```mermaid +sequenceDiagram + participant User + participant useChat as useChat + Transport + participant API as Trigger.dev API + participant Task as chat.task Worker + participant LLM as LLM Provider + + Note over Task: Streaming response... + + User->>useChat: Click "Stop" + useChat->>API: sendInputStream(runId, "chat-stop", { stop: true }) + API->>Task: Deliver to stopInput + Task->>Task: stopController.abort() + LLM-->>Task: Stream ends (AbortError) + Task->>Task: cleanupAbortedParts(responseMessage) + Note right of Task: Remove partial tool calls,
mark streaming parts as done + Task->>API: Write __trigger_turn_complete + API-->>useChat: SSE: turn complete + Task->>Task: onTurnComplete({ stopped: true }) + Task->>Task: Wait for next message +``` + +
+ + + Requires `@trigger.dev/sdk` version **4.4.0 or later** and the `ai` package **v5.0.0 or later**. + + +## How multi-turn works + +### One run, many turns + +The entire conversation lives in a **single Trigger.dev run**. After each AI response, the run waits for the next message via input streams. The frontend transport handles this automatically — it triggers a new run for the first message, and sends subsequent messages to the existing run. + +This means your conversation has full observability in the Trigger.dev dashboard: every turn is a span inside the same run. + +### Warm and suspended states + +After each turn, the run goes through two phases of waiting: + +1. **Warm phase** (default 30s) — The run stays active and responds instantly to the next message. Uses compute. +2. **Suspended phase** (default up to 1h) — The run suspends, freeing compute. It wakes when the next message arrives. There's a brief delay as the run resumes. + +If no message arrives within the turn timeout, the run ends gracefully. The next message from the frontend will automatically start a fresh run. + + + You are not charged for compute during the suspended phase. Only the warm phase uses compute resources. + + +### What the backend accumulates + +The backend automatically accumulates the full conversation history across turns. After the first turn, the frontend transport only sends the new user message — not the entire history. This is handled transparently by the transport and task. + +The accumulated messages are available in: +- `run()` as `messages` (`ModelMessage[]`) — for passing to `streamText` +- `onTurnStart()` as `uiMessages` (`UIMessage[]`) — for persisting before streaming +- `onTurnComplete()` as `uiMessages` (`UIMessage[]`) — for persisting after the response + +## Three approaches + +There are three ways to build the backend, from most opinionated to most flexible: + +| Approach | Use when | What you get | +|----------|----------|--------------| +| [chat.task()](/ai-chat/backend#chattask) | Most apps | Auto-piping, lifecycle hooks, message accumulation, stop handling | +| [chat.createSession()](/ai-chat/backend#chatcreatesession) | Need a loop but not hooks | Async iterator with per-turn helpers, message accumulation, stop handling | +| [Raw task + primitives](/ai-chat/backend#raw-task-with-primitives) | Full control | Manual control of every step — use `chat.messages`, `chat.createStopSignal()`, etc. | + +## Related + +- [Quick Start](/ai-chat/quick-start) — Get a working chat in 3 steps +- [Backend](/ai-chat/backend) — Backend approaches in detail +- [Frontend](/ai-chat/frontend) — Transport setup, sessions, client data +- [Features](/ai-chat/features) — Per-run data, deferred work, streaming, subtasks +- [API Reference](/ai-chat/reference) — Complete reference tables diff --git a/docs/ai-chat/quick-start.mdx b/docs/ai-chat/quick-start.mdx new file mode 100644 index 0000000000..b8245d9237 --- /dev/null +++ b/docs/ai-chat/quick-start.mdx @@ -0,0 +1,108 @@ +--- +title: "Quick Start" +sidebarTitle: "Quick Start" +description: "Get a working AI chat in 3 steps — define a task, generate a token, and wire up the frontend." +--- + + + + Use `chat.task` from `@trigger.dev/sdk/ai` to define a task that handles chat messages. The `run` function receives `ModelMessage[]` (already converted from the frontend's `UIMessage[]`) — pass them directly to `streamText`. + + If you return a `StreamTextResult`, it's **automatically piped** to the frontend. + + ```ts trigger/chat.ts + import { chat } from "@trigger.dev/sdk/ai"; + import { streamText } from "ai"; + import { openai } from "@ai-sdk/openai"; + + export const myChat = chat.task({ + id: "my-chat", + run: async ({ messages, signal }) => { + // messages is ModelMessage[] — pass directly to streamText + // signal fires on stop or run cancel + return streamText({ + model: openai("gpt-4o"), + messages, + abortSignal: signal, + }); + }, + }); + ``` + + + + On your server (e.g. a Next.js server action), create a trigger public token scoped to your chat task: + + ```ts app/actions.ts + "use server"; + + import { chat } from "@trigger.dev/sdk/ai"; + import type { myChat } from "@/trigger/chat"; + + export const getChatToken = () => + chat.createAccessToken("my-chat"); + ``` + + + + Use the `useTriggerChatTransport` hook from `@trigger.dev/sdk/chat/react` to create a memoized transport instance, then pass it to `useChat`: + + ```tsx app/components/chat.tsx + "use client"; + + import { useChat } from "@ai-sdk/react"; + import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react"; + import type { myChat } from "@/trigger/chat"; + import { getChatToken } from "@/app/actions"; + + export function Chat() { + const transport = useTriggerChatTransport({ + task: "my-chat", + accessToken: getChatToken, + }); + + const { messages, sendMessage, stop, status } = useChat({ transport }); + + return ( +
+ {messages.map((m) => ( +
+ {m.role}: + {m.parts.map((part, i) => + part.type === "text" ? {part.text} : null + )} +
+ ))} + +
{ + e.preventDefault(); + const input = e.currentTarget.querySelector("input"); + if (input?.value) { + sendMessage({ text: input.value }); + input.value = ""; + } + }} + > + + + {status === "streaming" && ( + + )} +
+
+ ); + } + ``` +
+
+ +## Next steps + +- [Backend](/ai-chat/backend) — Lifecycle hooks, persistence, session iterator, raw task primitives +- [Frontend](/ai-chat/frontend) — Session management, client data, reconnection +- [Features](/ai-chat/features) — Per-run data, deferred work, streaming, subtasks diff --git a/docs/ai-chat/reference.mdx b/docs/ai-chat/reference.mdx new file mode 100644 index 0000000000..420decee98 --- /dev/null +++ b/docs/ai-chat/reference.mdx @@ -0,0 +1,257 @@ +--- +title: "API Reference" +sidebarTitle: "API Reference" +description: "Complete API reference for the AI Chat SDK — backend options, events, frontend transport, and hooks." +--- + +## ChatTaskOptions + +Options for `chat.task()`. + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `id` | `string` | required | Task identifier | +| `run` | `(payload: ChatTaskRunPayload) => Promise` | required | Handler for each turn | +| `clientDataSchema` | `TaskSchema` | — | Schema for validating and typing `clientData` | +| `onPreload` | `(event: PreloadEvent) => Promise \| void` | — | Fires on preloaded runs before the first message | +| `onChatStart` | `(event: ChatStartEvent) => Promise \| void` | — | Fires on turn 0 before `run()` | +| `onTurnStart` | `(event: TurnStartEvent) => Promise \| void` | — | Fires every turn before `run()` | +| `onTurnComplete` | `(event: TurnCompleteEvent) => Promise \| void` | — | Fires after each turn completes | +| `maxTurns` | `number` | `100` | Max conversational turns per run | +| `turnTimeout` | `string` | `"1h"` | How long to wait for next message | +| `warmTimeoutInSeconds` | `number` | `30` | Seconds to stay warm before suspending | +| `chatAccessTokenTTL` | `string` | `"1h"` | How long the scoped access token remains valid | +| `preloadWarmTimeoutInSeconds` | `number` | Same as `warmTimeoutInSeconds` | Warm timeout after `onPreload` fires | +| `preloadTimeout` | `string` | Same as `turnTimeout` | Suspend timeout for preloaded runs | +| `uiMessageStreamOptions` | `ChatUIMessageStreamOptions` | — | Default options for `toUIMessageStream()`. Per-turn override via `chat.setUIMessageStreamOptions()` | + +Plus all standard [TaskOptions](/tasks/overview) — `retry`, `queue`, `machine`, `maxDuration`, etc. + +## ChatTaskRunPayload + +The payload passed to the `run` function. + +| Field | Type | Description | +|-------|------|-------------| +| `messages` | `ModelMessage[]` | Model-ready messages — pass directly to `streamText` | +| `chatId` | `string` | Unique chat session ID | +| `trigger` | `"submit-message" \| "regenerate-message"` | What triggered the request | +| `messageId` | `string \| undefined` | Message ID (for regenerate) | +| `clientData` | Typed by `clientDataSchema` | Custom data from the frontend (typed when schema is provided) | +| `continuation` | `boolean` | Whether this run is continuing an existing chat (previous run ended) | +| `signal` | `AbortSignal` | Combined stop + cancel signal | +| `cancelSignal` | `AbortSignal` | Cancel-only signal | +| `stopSignal` | `AbortSignal` | Stop-only signal (per-turn) | + +## PreloadEvent + +Passed to the `onPreload` callback. + +| Field | Type | Description | +|-------|------|-------------| +| `chatId` | `string` | Chat session ID | +| `runId` | `string` | The Trigger.dev run ID | +| `chatAccessToken` | `string` | Scoped access token for this run | +| `clientData` | Typed by `clientDataSchema` | Custom data from the frontend | + +## ChatStartEvent + +Passed to the `onChatStart` callback. + +| Field | Type | Description | +|-------|------|-------------| +| `chatId` | `string` | Chat session ID | +| `messages` | `ModelMessage[]` | Initial model-ready messages | +| `clientData` | Typed by `clientDataSchema` | Custom data from the frontend | +| `runId` | `string` | The Trigger.dev run ID | +| `chatAccessToken` | `string` | Scoped access token for this run | +| `continuation` | `boolean` | Whether this run is continuing an existing chat | +| `previousRunId` | `string \| undefined` | Previous run ID (only when `continuation` is true) | +| `preloaded` | `boolean` | Whether this run was preloaded before the first message | + +## TurnStartEvent + +Passed to the `onTurnStart` callback. + +| Field | Type | Description | +|-------|------|-------------| +| `chatId` | `string` | Chat session ID | +| `messages` | `ModelMessage[]` | Full accumulated conversation (model format) | +| `uiMessages` | `UIMessage[]` | Full accumulated conversation (UI format) | +| `turn` | `number` | Turn number (0-indexed) | +| `runId` | `string` | The Trigger.dev run ID | +| `chatAccessToken` | `string` | Scoped access token for this run | +| `clientData` | Typed by `clientDataSchema` | Custom data from the frontend | +| `continuation` | `boolean` | Whether this run is continuing an existing chat | +| `previousRunId` | `string \| undefined` | Previous run ID (only when `continuation` is true) | +| `preloaded` | `boolean` | Whether this run was preloaded | + +## TurnCompleteEvent + +Passed to the `onTurnComplete` callback. + +| Field | Type | Description | +|-------|------|-------------| +| `chatId` | `string` | Chat session ID | +| `messages` | `ModelMessage[]` | Full accumulated conversation (model format) | +| `uiMessages` | `UIMessage[]` | Full accumulated conversation (UI format) | +| `newMessages` | `ModelMessage[]` | Only this turn's messages (model format) | +| `newUIMessages` | `UIMessage[]` | Only this turn's messages (UI format) | +| `responseMessage` | `UIMessage \| undefined` | The assistant's response for this turn | +| `rawResponseMessage` | `UIMessage \| undefined` | Raw response before abort cleanup | +| `turn` | `number` | Turn number (0-indexed) | +| `runId` | `string` | The Trigger.dev run ID | +| `chatAccessToken` | `string` | Scoped access token for this run | +| `lastEventId` | `string \| undefined` | Stream position for resumption | +| `stopped` | `boolean` | Whether the user stopped generation during this turn | +| `continuation` | `boolean` | Whether this run is continuing an existing chat | + +## ChatSessionOptions + +Options for `chat.createSession()`. + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `signal` | `AbortSignal` | required | Run-level cancel signal | +| `warmTimeoutInSeconds` | `number` | `30` | Seconds to stay warm between turns | +| `timeout` | `string` | `"1h"` | Duration string for suspend timeout | +| `maxTurns` | `number` | `100` | Max turns before ending | + +## ChatTurn + +Each turn yielded by `chat.createSession()`. + +| Field | Type | Description | +|-------|------|-------------| +| `number` | `number` | Turn number (0-indexed) | +| `chatId` | `string` | Chat session ID | +| `trigger` | `string` | What triggered this turn | +| `clientData` | `unknown` | Client data from the transport | +| `messages` | `ModelMessage[]` | Full accumulated model messages | +| `uiMessages` | `UIMessage[]` | Full accumulated UI messages | +| `signal` | `AbortSignal` | Combined stop+cancel signal (fresh each turn) | +| `stopped` | `boolean` | Whether the user stopped generation this turn | +| `continuation` | `boolean` | Whether this is a continuation run | + +| Method | Returns | Description | +|--------|---------|-------------| +| `complete(source)` | `Promise` | Pipe, capture, accumulate, cleanup, and signal turn-complete | +| `done()` | `Promise` | Signal turn-complete (when you've piped manually) | +| `addResponse(response)` | `Promise` | Add response to accumulator manually | + +## chat namespace + +All methods available on the `chat` object from `@trigger.dev/sdk/ai`. + +| Method | Description | +|--------|-------------| +| `chat.task(options)` | Create a chat task | +| `chat.createSession(payload, options)` | Create an async iterator for chat turns | +| `chat.pipe(source, options?)` | Pipe a stream to the frontend (from anywhere inside a task) | +| `chat.pipeAndCapture(source, options?)` | Pipe and capture the response `UIMessage` | +| `chat.writeTurnComplete(options?)` | Signal the frontend that the current turn is complete | +| `chat.createStopSignal()` | Create a managed stop signal wired to the stop input stream | +| `chat.messages` | Input stream for incoming messages — use `.waitWithWarmup()` | +| `chat.local({ id })` | Create a per-run typed local (see [Per-run data](/ai-chat/features#per-run-data-with-chatlocal)) | +| `chat.createAccessToken(taskId)` | Create a public access token for a chat task | +| `chat.setTurnTimeout(duration)` | Override turn timeout at runtime (e.g. `"2h"`) | +| `chat.setTurnTimeoutInSeconds(seconds)` | Override turn timeout at runtime (in seconds) | +| `chat.setWarmTimeoutInSeconds(seconds)` | Override warm timeout at runtime | +| `chat.setUIMessageStreamOptions(options)` | Override `toUIMessageStream()` options for the current turn | +| `chat.defer(promise)` | Run background work in parallel with streaming, awaited before `onTurnComplete` | +| `chat.isStopped()` | Check if the current turn was stopped by the user | +| `chat.cleanupAbortedParts(message)` | Remove incomplete parts from a stopped response message | +| `chat.stream` | Typed chat output stream — use `.writer()`, `.pipe()`, `.append()`, `.read()` | +| `chat.MessageAccumulator` | Class that accumulates conversation messages across turns | + +## ChatUIMessageStreamOptions + +Options for customizing `toUIMessageStream()`. Set as static defaults via `uiMessageStreamOptions` on `chat.task()`, or override per-turn via `chat.setUIMessageStreamOptions()`. See [Stream options](/ai-chat/backend#stream-options) for usage examples. + +Derived from the AI SDK's `UIMessageStreamOptions` with `onFinish`, `originalMessages`, and `generateMessageId` omitted (managed internally). + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `onError` | `(error: unknown) => string` | Raw error message | Called on LLM errors and tool execution errors. Return a sanitized string — sent as `{ type: "error", errorText }` to the frontend. | +| `sendReasoning` | `boolean` | `true` | Send reasoning parts to the client | +| `sendSources` | `boolean` | `false` | Send source parts to the client | +| `sendFinish` | `boolean` | `true` | Send the finish event. Set to `false` when chaining multiple `streamText` calls. | +| `sendStart` | `boolean` | `true` | Send the message start event. Set to `false` when chaining. | +| `messageMetadata` | `(options: { part }) => metadata` | — | Extract message metadata to send to the client. Called on `start` and `finish` events. | + +## TriggerChatTransport options + +Options for the frontend transport constructor and `useTriggerChatTransport` hook. + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `task` | `string` | required | Task ID to trigger | +| `accessToken` | `string \| () => string \| Promise` | required | Auth token or function that returns one | +| `baseURL` | `string` | `"https://api.trigger.dev"` | API base URL (for self-hosted) | +| `streamKey` | `string` | `"chat"` | Stream key (only change if using custom key) | +| `headers` | `Record` | — | Extra headers for API requests | +| `streamTimeoutSeconds` | `number` | `120` | How long to wait for stream data | +| `clientData` | Typed by `clientDataSchema` | — | Default client data for every request | +| `sessions` | `Record` | — | Restore sessions from storage | +| `onSessionChange` | `(chatId, session \| null) => void` | — | Fires when session state changes | +| `triggerOptions` | `{...}` | — | Options for the initial task trigger (see below) | + +### triggerOptions + +Options forwarded to the Trigger.dev API when starting a new run. Only applies to the first message — subsequent messages reuse the same run. + +A `chat:{chatId}` tag is automatically added to every run. + +| Option | Type | Description | +|--------|------|-------------| +| `tags` | `string[]` | Additional tags for the run (merged with auto-tags, max 5 total) | +| `queue` | `string` | Queue name for the run | +| `maxAttempts` | `number` | Maximum retry attempts | +| `machine` | `"micro" \| "small-1x" \| ...` | Machine preset for the run | +| `priority` | `number` | Priority (lower = higher priority) | + +```ts +const transport = useTriggerChatTransport({ + task: "my-chat", + accessToken: getChatToken, + triggerOptions: { + tags: ["user:123"], + queue: "chat-queue", + }, +}); +``` + +### transport.preload() + +Eagerly trigger a run before the first message. + +```ts +transport.preload(chatId, { warmTimeoutInSeconds?: number }): Promise +``` + +No-op if a session already exists for this chatId. See [Preload](/ai-chat/features#preload) for full details. + +## useTriggerChatTransport + +React hook that creates and memoizes a `TriggerChatTransport` instance. Import from `@trigger.dev/sdk/chat/react`. + +```tsx +import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react"; +import type { myChat } from "@/trigger/chat"; + +const transport = useTriggerChatTransport({ + task: "my-chat", + accessToken: () => getChatToken(), + sessions: savedSessions, + onSessionChange: handleSessionChange, +}); +``` + +The transport is created once on first render and reused across re-renders. Pass a type parameter for compile-time validation of the task ID. + +## Related + +- [Realtime Streams](/tasks/streams) — How streams work under the hood +- [Using the Vercel AI SDK](/guides/examples/vercel-ai-sdk) — Basic AI SDK usage with Trigger.dev +- [Realtime React Hooks](/realtime/react-hooks/overview) — Lower-level realtime hooks +- [Authentication](/realtime/auth) — Public access tokens and trigger tokens diff --git a/docs/docs.json b/docs/docs.json index 14d728e2db..79a66c3d35 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -80,6 +80,17 @@ "hidden-tasks" ] }, + { + "group": "AI Chat", + "pages": [ + "ai-chat/overview", + "ai-chat/quick-start", + "ai-chat/backend", + "ai-chat/frontend", + "ai-chat/features", + "ai-chat/reference" + ] + }, { "group": "Configuration", "pages": [ @@ -729,6 +740,10 @@ { "source": "/insights/metrics", "destination": "/observability/dashboards" + }, + { + "source": "/guides/ai-chat", + "destination": "/ai-chat/overview" } ] } diff --git a/references/ai-chat/ARCHITECTURE.md b/references/ai-chat/ARCHITECTURE.md new file mode 100644 index 0000000000..8adbc0c4a1 --- /dev/null +++ b/references/ai-chat/ARCHITECTURE.md @@ -0,0 +1,311 @@ +# AI Chat Architecture + +## System Overview + +```mermaid +graph TB + subgraph Frontend["Frontend (Browser)"] + UC[useChat Hook] + TCT[TriggerChatTransport] + UI[Chat UI Components] + end + + subgraph Platform["Trigger.dev Platform"] + API[REST API] + RS[Realtime Streams] + RE[Run Engine] + end + + subgraph Worker["Task Worker"] + CT[chat.task Turn Loop] + ST[streamText / AI SDK] + LLM[LLM Provider] + SUB[Subtasks via ai.tool] + end + + UI -->|user types| UC + UC -->|sendMessages| TCT + TCT -->|triggerTask / sendInputStream| API + API -->|queue run / deliver input| RE + RE -->|execute| CT + CT -->|call| ST + ST -->|API call| LLM + LLM -->|stream chunks| ST + ST -->|UIMessageChunks| RS + RS -->|SSE| TCT + TCT -->|ReadableStream| UC + UC -->|update| UI + CT -->|triggerAndWait| SUB + SUB -->|chat.stream target:root| RS +``` + +## Detailed Flow: New Chat (First Message) + +```mermaid +sequenceDiagram + participant User + participant useChat as useChat + Transport + participant API as Trigger.dev API + participant Task as chat.task Worker + participant LLM as LLM Provider + + User->>useChat: sendMessage("Hello") + useChat->>useChat: No session for chatId → trigger new run + + useChat->>API: triggerTask(payload, tags: [chat:id]) + API-->>useChat: { runId, publicAccessToken } + useChat->>useChat: Store session, subscribe to SSE + + API->>Task: Start run with ChatTaskWirePayload + + Note over Task: Preload phase skipped (trigger ≠ "preload") + + rect rgb(240, 248, 255) + Note over Task: Turn 0 + Task->>Task: convertToModelMessages(uiMessages) + Task->>Task: Mint access token + Task->>Task: onChatStart({ chatId, messages, clientData }) + Task->>Task: onTurnStart({ chatId, messages, uiMessages }) + Task->>LLM: streamText({ model, messages, abortSignal }) + LLM-->>Task: Stream response chunks + Task->>API: streams.pipe("chat", uiStream) + API-->>useChat: SSE: UIMessageChunks + useChat-->>User: Render streaming text + Task->>Task: onFinish → capturedResponseMessage + Task->>Task: Accumulate response in messages + Task->>API: Write __trigger_turn_complete chunk + API-->>useChat: SSE: { type: __trigger_turn_complete, publicAccessToken } + useChat->>useChat: Close stream, update session + Task->>Task: onTurnComplete({ messages, uiMessages, stopped }) + end + + rect rgb(255, 248, 240) + Note over Task: Wait for next message + Task->>Task: messagesInput.once() [warm, 30s] + Note over Task: No message → suspend + Task->>Task: messagesInput.wait() [suspended, 1h] + end +``` + +## Detailed Flow: Multi-Turn (Subsequent Messages) + +```mermaid +sequenceDiagram + participant User + participant useChat as useChat + Transport + participant API as Trigger.dev API + participant Task as chat.task Worker + participant LLM as LLM Provider + + Note over Task: Suspended, waiting for message + + User->>useChat: sendMessage("Tell me more") + useChat->>useChat: Session exists → send via input stream + useChat->>API: sendInputStream(runId, "chat-messages", payload) + Note right of useChat: Only sends new message
(not full history) + + API->>Task: Deliver to messagesInput + Task->>Task: Wake from suspend + + rect rgb(240, 248, 255) + Note over Task: Turn 1 + Task->>Task: Append new message to accumulators + Task->>Task: Mint fresh access token + Task->>Task: onTurnStart({ turn: 1, messages }) + Task->>LLM: streamText({ messages: [all accumulated] }) + LLM-->>Task: Stream response + Task->>API: streams.pipe("chat", uiStream) + API-->>useChat: SSE: UIMessageChunks + useChat-->>User: Render streaming text + Task->>API: Write __trigger_turn_complete + Task->>Task: onTurnComplete({ turn: 1 }) + end + + Task->>Task: Wait for next message (warm → suspend) +``` + +## Stop Signal Flow + +```mermaid +sequenceDiagram + participant User + participant useChat as useChat + Transport + participant API as Trigger.dev API + participant Task as chat.task Worker + participant LLM as LLM Provider + + Note over Task: Streaming response... + + User->>useChat: Click "Stop" + useChat->>API: sendInputStream(runId, "chat-stop", { stop: true }) + useChat->>useChat: Set skipToTurnComplete = true + + API->>Task: Deliver to stopInput + Task->>Task: stopController.abort() + Task->>LLM: AbortSignal fires + LLM-->>Task: Stream ends (AbortError) + Task->>Task: Catch AbortError, fall through + Task->>Task: await onFinishPromise (race condition fix) + Task->>Task: cleanupAbortedParts(responseMessage) + Note right of Task: Remove partial tool calls
Mark streaming parts as done + + Task->>API: Write __trigger_turn_complete + API-->>useChat: SSE: __trigger_turn_complete + useChat->>useChat: skipToTurnComplete = false, close stream + + Task->>Task: onTurnComplete({ stopped: true, responseMessage: cleaned }) + Task->>Task: Wait for next message +``` + +## Preload Flow + +```mermaid +sequenceDiagram + participant User + participant useChat as useChat + Transport + participant API as Trigger.dev API + participant Task as chat.task Worker + + User->>useChat: Click "New Chat" + useChat->>API: transport.preload(chatId) + Note right of useChat: payload: { messages: [], trigger: "preload" }
tags: [chat:id, preload:true] + API-->>useChat: { runId, publicAccessToken } + useChat->>useChat: Store session + + API->>Task: Start run (trigger = "preload") + + rect rgb(240, 255, 240) + Note over Task: Preload Phase + Task->>Task: Mint access token + Task->>Task: onPreload({ chatId, clientData }) + Note right of Task: DB setup, load user context,
load dynamic tools + Task->>Task: messagesInput.once() [warm] + Note over Task: Waiting for first message... + end + + Note over User: User is typing... + + User->>useChat: sendMessage("Hello") + useChat->>useChat: Session exists → send via input stream + useChat->>API: sendInputStream(runId, "chat-messages", payload) + API->>Task: Deliver message + + rect rgb(240, 248, 255) + Note over Task: Turn 0 (preloaded = true) + Task->>Task: onChatStart({ preloaded: true }) + Task->>Task: onTurnStart({ preloaded: true }) + Task->>Task: run() with preloaded dynamic tools ready + end +``` + +## Subtask Streaming (Tool as Task) + +```mermaid +sequenceDiagram + participant useChat as useChat + Transport + participant API as Trigger.dev API + participant Chat as chat.task + participant LLM as LLM Provider + participant Sub as Subtask (ai.tool) + + Chat->>LLM: streamText({ tools: { research: ai.tool(task) } }) + LLM-->>Chat: Tool call: research({ query, urls }) + + Chat->>API: triggerAndWait(subtask, input) + Note right of Chat: Passes toolCallId, chatId,
clientData via metadata + + API->>Sub: Start subtask + + Sub->>Sub: ai.chatContextOrThrow() → { chatId, clientData } + Sub->>API: chat.stream.writer({ target: "root" }) + Note right of Sub: Write data-research-progress
chunks to parent's stream + API-->>useChat: SSE: data-* chunks + useChat-->>useChat: Render progress UI + + Sub-->>Chat: Return result + Chat->>LLM: Tool result + LLM-->>Chat: Continue response +``` + +## Continuation Flow (Run Timeout / Cancel) + +```mermaid +sequenceDiagram + participant User + participant useChat as useChat + Transport + participant API as Trigger.dev API + participant Task as chat.task Worker + + Note over Task: Previous run timed out / was cancelled + + User->>useChat: sendMessage("Continue") + useChat->>API: sendInputStream(runId, payload) + API-->>useChat: Error (run dead) + + useChat->>useChat: Delete session, set isContinuation = true + useChat->>API: triggerTask(payload, continuation: true, previousRunId) + API-->>useChat: New { runId, publicAccessToken } + + API->>Task: Start new run + + rect rgb(255, 245, 238) + Note over Task: Turn 0 (continuation = true) + Task->>Task: cleanupAbortedParts(incoming messages) + Note right of Task: Strip incomplete tool calls
from previous run's response + Task->>Task: onChatStart({ continuation: true, previousRunId }) + Task->>Task: Normal turn flow... + end +``` + +## Hook Lifecycle + +```mermaid +graph TD + START([Run Starts]) --> IS_PRELOAD{trigger = preload?} + + IS_PRELOAD -->|Yes| PRELOAD[onPreload] + PRELOAD --> WAIT_MSG[Wait for first message
warm → suspend] + WAIT_MSG --> TURN0 + + IS_PRELOAD -->|No| TURN0 + + TURN0[Turn 0] --> CHAT_START[onChatStart
continuation, preloaded] + CHAT_START --> TURN_START_0[onTurnStart] + TURN_START_0 --> RUN_0[run → streamText] + RUN_0 --> TURN_COMPLETE_0[onTurnComplete
stopped, responseMessage] + + TURN_COMPLETE_0 --> WAIT{Wait for
next message} + WAIT -->|Message arrives| TURN_N[Turn N] + WAIT -->|Timeout| END_RUN([Run Ends]) + + TURN_N --> TURN_START_N[onTurnStart] + TURN_START_N --> RUN_N[run → streamText] + RUN_N --> TURN_COMPLETE_N[onTurnComplete] + TURN_COMPLETE_N --> WAIT +``` + +## Stream Architecture + +```mermaid +graph LR + subgraph Output["Output Stream (chat)"] + direction TB + O1[UIMessageChunks
text, reasoning, tools] + O2[data-* custom chunks] + O3[__trigger_turn_complete
control chunk] + end + + subgraph Input["Input Streams"] + direction TB + I1[chat-messages
User messages] + I2[chat-stop
Stop signal] + end + + Frontend -->|sendInputStream| I1 + Frontend -->|sendInputStream| I2 + I1 -->|messagesInput.once/wait| Worker + I2 -->|stopInput.on| Worker + Worker -->|streams.pipe / chat.stream| Output + Subtask -->|chat.stream target:root| Output + Output -->|SSE /realtime/v1/streams| Frontend +```