From 7dc2c0f968395c1c5de04ac79c27a9b7949d3cb5 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 26 Apr 2026 21:09:12 +0000 Subject: [PATCH 01/10] feat(ai-client): support TanStack Start server functions in stream() adapter The stream() factory now accepts Promise> and Promise in addition to the existing AsyncIterable shape, so a TanStack Start server function (which is just an async API endpoint) can be wired directly into useChat without a route handler: useChat({ connection: stream((messages) => chatFn({ data: { messages } })), }) When the factory returns a Response (e.g. via toServerSentEventsResponse), the adapter parses the SSE body into chunks. rpcStream() likewise accepts a Promise-returning RPC call. Adds unit tests for both new shapes and a docs section in chat/connection-adapters.md. --- .changeset/stream-adapter-server-functions.md | 23 +++ docs/chat/connection-adapters.md | 56 +++++++ .../ai-client/src/connection-adapters.ts | 138 ++++++++++++------ packages/typescript/ai-client/src/index.ts | 1 + .../tests/connection-adapters.test.ts | 115 +++++++++++++++ 5 files changed, 291 insertions(+), 42 deletions(-) create mode 100644 .changeset/stream-adapter-server-functions.md diff --git a/.changeset/stream-adapter-server-functions.md b/.changeset/stream-adapter-server-functions.md new file mode 100644 index 000000000..ae024ca65 --- /dev/null +++ b/.changeset/stream-adapter-server-functions.md @@ -0,0 +1,23 @@ +--- +'@tanstack/ai-client': minor +--- + +feat(ai-client): support TanStack Start server functions in `stream()` connection adapter + +The `stream()` factory now accepts any of three return shapes, so a server function (which is just an async API endpoint) can be wired directly into `useChat`: + +- `AsyncIterable` — direct in-process stream (existing behavior) +- `Promise>` — server function returning the chat stream +- `Promise` — server function returning `toServerSentEventsResponse(stream)` + +`rpcStream()` likewise accepts a `Promise>`. + +```ts +const chatFn = createServerFn({ method: 'POST' }) + .inputValidator((data: { messages: Array }) => data) + .handler(({ data }) => + toServerSentEventsResponse(chat({ adapter, messages: data.messages })), + ) + +useChat({ connection: stream((messages) => chatFn({ data: { messages } })) }) +``` diff --git a/docs/chat/connection-adapters.md b/docs/chat/connection-adapters.md index 70329125c..7dbe080b8 100644 --- a/docs/chat/connection-adapters.md +++ b/docs/chat/connection-adapters.md @@ -72,6 +72,62 @@ const { messages } = useChat({ }); ``` +### TanStack Start Server Functions + +A server function is just a fancy API endpoint, so `stream()` adapts one into a `useChat` connection. The factory you pass to `stream()` may return either the chat `AsyncIterable` directly, or an SSE `Response` produced by `toServerSentEventsResponse()` — both shapes work because `stream()` awaits the result and unwraps a `Response` if it sees one. + +#### Returning an SSE Response (recommended) + +Wrap the chat stream in `toServerSentEventsResponse()` so only encoded bytes flow over the wire. The client parses the SSE automatically: + +```typescript +// server-fns.ts +import { createServerFn } from "@tanstack/react-start"; +import { chat, toServerSentEventsResponse } from "@tanstack/ai"; +import { openaiText } from "@tanstack/ai-openai"; +import type { UIMessage } from "@tanstack/ai"; + +export const chatFn = createServerFn({ method: "POST" }) + .inputValidator((data: { messages: Array }) => data) + .handler(({ data }) => + toServerSentEventsResponse( + chat({ + adapter: openaiText("gpt-4o"), + messages: data.messages, + }), + ), + ); +``` + +```tsx +// client +import { useChat, stream } from "@tanstack/ai-react"; +import { chatFn } from "./server-fns"; + +const { messages, sendMessage } = useChat({ + connection: stream((messages) => chatFn({ data: { messages } })), +}); +``` + +#### Returning the AsyncIterable directly + +If you don't want to encode an HTTP response, return the chat stream itself. `stream()` awaits the server function and yields chunks straight through: + +```typescript +// server-fns.ts +export const chatFn = createServerFn({ method: "POST" }) + .inputValidator((data: { messages: Array }) => data) + .handler(({ data }) => + chat({ adapter: openaiText("gpt-4o"), messages: data.messages }), + ); +``` + +```tsx +const { messages, sendMessage } = useChat({ + connection: stream((messages) => chatFn({ data: { messages } })), +}); +``` + ## Custom Adapters For specialized use cases, you can create custom adapters to meet specific protocols or requirements: diff --git a/packages/typescript/ai-client/src/connection-adapters.ts b/packages/typescript/ai-client/src/connection-adapters.ts index 182bddbbb..0ad8c338d 100644 --- a/packages/typescript/ai-client/src/connection-adapters.ts +++ b/packages/typescript/ai-client/src/connection-adapters.ts @@ -19,6 +19,43 @@ function mergeHeaders( return customHeaders } +/** + * Parse SSE-formatted lines into StreamChunks. + * Handles both `data: {...}` lines and bare JSON lines, and skips `[DONE]`. + */ +async function* parseSSEChunks( + lines: AsyncIterable, +): AsyncGenerator { + for await (const line of lines) { + const data = line.startsWith('data: ') ? line.slice(6) : line + if (data === '[DONE]') continue + try { + yield JSON.parse(data) as StreamChunk + } catch { + console.warn('Failed to parse SSE chunk:', data) + } + } +} + +/** + * Yield StreamChunks from a Response body parsed as SSE. + */ +async function* responseToSSEChunks( + response: Response, + abortSignal?: AbortSignal, +): AsyncGenerator { + if (!response.ok) { + throw new Error( + `HTTP error! status: ${response.status} ${response.statusText}`, + ) + } + const reader = response.body?.getReader() + if (!reader) { + throw new Error('Response body is not readable') + } + yield* parseSSEChunks(readStreamLines(reader, abortSignal)) +} + /** * Read lines from a stream (newline-delimited) */ @@ -294,32 +331,7 @@ export function fetchServerSentEvents( signal: abortSignal || resolvedOptions.signal, }) - if (!response.ok) { - throw new Error( - `HTTP error! status: ${response.status} ${response.statusText}`, - ) - } - - // Parse Server-Sent Events format - const reader = response.body?.getReader() - if (!reader) { - throw new Error('Response body is not readable') - } - - for await (const line of readStreamLines(reader, abortSignal)) { - // Handle Server-Sent Events format - const data = line.startsWith('data: ') ? line.slice(6) : line - - if (data === '[DONE]') continue - - try { - const parsed: StreamChunk = JSON.parse(data) - yield parsed - } catch (parseError) { - // Skip non-JSON lines or malformed chunks - console.warn('Failed to parse SSE chunk:', data) - } - } + yield* responseToSSEChunks(response, abortSignal) }, } } @@ -418,36 +430,79 @@ export function fetchHttpStream( } /** - * Create a direct stream connection adapter (for server functions or direct streams) + * Result shapes a `stream()` factory may return. + * + * - `AsyncIterable` — a direct in-process stream (e.g. `chat()`). + * - `Promise>` — a TanStack Start server function + * whose handler returns the chat stream directly. + * - `Promise` — a server function whose handler returns + * `toServerSentEventsResponse(stream)` (or any HTTP endpoint returning SSE). + */ +export type StreamFactoryResult = + | AsyncIterable + | Promise | Response> + +/** + * Create a direct stream connection adapter. + * + * Accepts any of: + * + * 1. An in-process async iterable factory (returns `AsyncIterable`). + * 2. A TanStack Start server function whose handler returns the chat stream + * (returns `Promise>`). + * 3. A TanStack Start server function whose handler returns an SSE `Response` + * via `toServerSentEventsResponse(stream)` (returns `Promise`). * - * @param streamFactory - A function that returns an async iterable of StreamChunks - * @returns A connection adapter for direct streams + * Server functions are just async API endpoints — the third shape is the + * recommended pattern when you want to keep network bytes small (SSE) and + * still get end-to-end type safety from the server function call. + * + * @param streamFactory - Function called per request; returns one of the shapes above. * * @example * ```typescript - * // With TanStack Start server function - * const connection = stream(() => serverFunction({ messages })); + * // 1. In-process async iterable (e.g. tests, in-memory loop) + * useChat({ connection: stream(async function* () { yield ... }) }) * - * const client = new ChatClient({ connection }); + * // 2. Server function returning the chat stream directly + * const chatFn = createServerFn({ method: 'POST' }) + * .inputValidator((data: { messages: UIMessage[] }) => data) + * .handler(({ data }) => chat({ adapter, messages: data.messages })) + * + * useChat({ connection: stream((messages) => chatFn({ data: { messages } })) }) + * + * // 3. Server function returning an SSE Response (recommended) + * const chatFn = createServerFn({ method: 'POST' }) + * .inputValidator((data: { messages: UIMessage[] }) => data) + * .handler(({ data }) => + * toServerSentEventsResponse(chat({ adapter, messages: data.messages })), + * ) + * + * useChat({ connection: stream((messages) => chatFn({ data: { messages } })) }) * ``` */ export function stream( streamFactory: ( messages: Array | Array, data?: Record, - ) => AsyncIterable, + ) => StreamFactoryResult, ): ConnectConnectionAdapter { return { - async *connect(messages, data) { - // Pass messages as-is (UIMessages with parts preserved) - // Server-side chat() handles conversion to ModelMessages - yield* streamFactory(messages, data) + async *connect(messages, data, abortSignal) { + const result = await streamFactory(messages, data) + if (result instanceof Response) { + yield* responseToSSEChunks(result, abortSignal) + } else { + yield* result + } }, } } /** - * Create an RPC stream connection adapter (for RPC-based streaming like Cap'n Web RPC) + * Create an RPC stream connection adapter (for RPC-based streaming like Cap'n Web RPC). + * + * The RPC call may return the async iterable synchronously or as a Promise. * * @param rpcCall - A function that accepts messages and returns an async iterable of StreamChunks * @returns A connection adapter for RPC streams @@ -466,13 +521,12 @@ export function rpcStream( rpcCall: ( messages: Array | Array, data?: Record, - ) => AsyncIterable, + ) => AsyncIterable | Promise>, ): ConnectConnectionAdapter { return { async *connect(messages, data) { - // Pass messages as-is (UIMessages with parts preserved) - // Server-side chat() handles conversion to ModelMessages - yield* rpcCall(messages, data) + const iterable = await rpcCall(messages, data) + yield* iterable }, } } diff --git a/packages/typescript/ai-client/src/index.ts b/packages/typescript/ai-client/src/index.ts index 93654ba66..2ab7c5071 100644 --- a/packages/typescript/ai-client/src/index.ts +++ b/packages/typescript/ai-client/src/index.ts @@ -59,6 +59,7 @@ export { type ConnectConnectionAdapter, type ConnectionAdapter, type FetchConnectionOptions, + type StreamFactoryResult, type SubscribeConnectionAdapter, } from './connection-adapters' diff --git a/packages/typescript/ai-client/tests/connection-adapters.test.ts b/packages/typescript/ai-client/tests/connection-adapters.test.ts index a5addde26..f67bdb26a 100644 --- a/packages/typescript/ai-client/tests/connection-adapters.test.ts +++ b/packages/typescript/ai-client/tests/connection-adapters.test.ts @@ -827,6 +827,90 @@ describe('connection-adapters', () => { data, ) }) + + it('should await Promise from a server function', async () => { + // Simulates a TanStack Start server function whose handler returns an + // async iterable directly: `createServerFn().handler(() => chat({...}))`. + async function* serverStream(): AsyncGenerator { + yield { + type: 'TEXT_MESSAGE_CONTENT', + messageId: 'msg-1', + model: 'test', + timestamp: Date.now(), + delta: 'Hi', + content: 'Hi', + } + yield { + type: 'RUN_FINISHED', + runId: 'run-1', + model: 'test', + timestamp: Date.now(), + finishReason: 'stop', + } + } + const serverFn = vi.fn( + async ( + _messages: Parameters[0]>[0], + _data?: Parameters[0]>[1], + ) => serverStream(), + ) + + const adapter = stream((messages, data) => serverFn(messages, data)) + const chunks: Array = [] + for await (const chunk of adapter.connect([ + { role: 'user', content: 'Hello' }, + ])) { + chunks.push(chunk) + } + + expect(serverFn).toHaveBeenCalled() + expect(chunks).toHaveLength(2) + expect(chunks[0]?.type).toBe('TEXT_MESSAGE_CONTENT') + expect(chunks[1]?.type).toBe('RUN_FINISHED') + }) + + it('should parse Promise SSE body from a server function', async () => { + // Simulates a server function whose handler returns + // `toServerSentEventsResponse(chat({...}))`. + const sseBody = [ + 'data: {"type":"TEXT_MESSAGE_CONTENT","messageId":"msg-1","model":"test","timestamp":1,"delta":"Hi","content":"Hi"}\n\n', + 'data: {"type":"RUN_FINISHED","runId":"run-1","model":"test","timestamp":2,"finishReason":"stop"}\n\n', + 'data: [DONE]\n\n', + ].join('') + const serverFn = vi.fn(async () => new Response(sseBody, { status: 200 })) + + const adapter = stream(() => serverFn()) + const chunks: Array = [] + for await (const chunk of adapter.connect([ + { role: 'user', content: 'Hello' }, + ])) { + chunks.push(chunk) + } + + expect(serverFn).toHaveBeenCalled() + expect(chunks).toHaveLength(2) + expect(chunks[0]?.type).toBe('TEXT_MESSAGE_CONTENT') + expect(chunks[1]?.type).toBe('RUN_FINISHED') + }) + + it('should throw when Response from server function is not ok', async () => { + const serverFn = vi.fn( + async () => + new Response('boom', { status: 500, statusText: 'Server Error' }), + ) + + const adapter = stream(() => serverFn()) + + await expect( + (async () => { + for await (const _ of adapter.connect([ + { role: 'user', content: 'Hello' }, + ])) { + // Consume + } + })(), + ).rejects.toThrow('HTTP error! status: 500 Server Error') + }) }) describe('normalizeConnectionAdapter', () => { @@ -1014,5 +1098,36 @@ describe('connection-adapters', () => { data, ) }) + + it('should await Promise from RPC call', async () => { + async function* rpcStreamGen(): AsyncGenerator { + yield { + type: 'TEXT_MESSAGE_CONTENT', + messageId: 'msg-1', + model: 'test', + timestamp: Date.now(), + delta: 'Hi', + content: 'Hi', + } + } + const rpcCall = vi.fn( + async ( + _messages: Parameters[0]>[0], + _data?: Parameters[0]>[1], + ) => rpcStreamGen(), + ) + + const adapter = rpcStream((messages, data) => rpcCall(messages, data)) + const chunks: Array = [] + for await (const chunk of adapter.connect([ + { role: 'user', content: 'Hello' }, + ])) { + chunks.push(chunk) + } + + expect(rpcCall).toHaveBeenCalled() + expect(chunks).toHaveLength(1) + expect(chunks[0]?.type).toBe('TEXT_MESSAGE_CONTENT') + }) }) }) From 61cd08e2cd933336bff4e85101a836c5094e40f7 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 26 Apr 2026 21:54:22 +0000 Subject: [PATCH 02/10] docs(ts-react-chat): add server function chat example MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a working /server-fn-chat route that wires useChat to a TanStack Start server function via the stream() connection adapter: useChat({ connection: stream((messages) => chatFn({ data: { messages: messages as UIMessage[] } }), ), }) The new chatFn handler in lib/server-fns.ts returns toServerSentEventsResponse(chat({ ... })) — the stream() adapter awaits the server function and parses the SSE response into chunks. Sits alongside the existing index.tsx pattern (fetchServerSentEvents to a route handler) so users can compare the two invocation styles. --- .../ts-react-chat/src/components/Header.tsx | 14 +++ examples/ts-react-chat/src/lib/server-fns.ts | 26 +++++ examples/ts-react-chat/src/routeTree.gen.ts | 21 ++++ .../src/routes/server-fn-chat.tsx | 110 ++++++++++++++++++ 4 files changed, 171 insertions(+) create mode 100644 examples/ts-react-chat/src/routes/server-fn-chat.tsx diff --git a/examples/ts-react-chat/src/components/Header.tsx b/examples/ts-react-chat/src/components/Header.tsx index 0b28cbc48..157489ae7 100644 --- a/examples/ts-react-chat/src/components/Header.tsx +++ b/examples/ts-react-chat/src/components/Header.tsx @@ -9,6 +9,7 @@ import { Image, Menu, Mic, + Server, Video, X, } from 'lucide-react' @@ -169,6 +170,19 @@ export default function Header() { Voice Chat (Realtime) + + setIsOpen(false)} + className="flex items-center gap-3 p-3 rounded-lg hover:bg-gray-800 transition-colors mb-2" + activeProps={{ + className: + 'flex items-center gap-3 p-3 rounded-lg bg-cyan-600 hover:bg-cyan-700 transition-colors mb-2', + }} + > + + Server Function Chat + diff --git a/examples/ts-react-chat/src/lib/server-fns.ts b/examples/ts-react-chat/src/lib/server-fns.ts index 624de8e89..69e10e216 100644 --- a/examples/ts-react-chat/src/lib/server-fns.ts +++ b/examples/ts-react-chat/src/lib/server-fns.ts @@ -1,6 +1,7 @@ import { createServerFn } from '@tanstack/react-start' import { z } from 'zod' import { + chat, generateImage, generateSpeech, generateTranscription, @@ -12,10 +13,12 @@ import { import { openaiImage, openaiSpeech, + openaiText, openaiTranscription, openaiSummarize, openaiVideo, } from '@tanstack/ai-openai' +import type { UIMessage } from '@tanstack/ai' // ============================================================================= // Direct server functions (non-streaming, return the result directly) @@ -235,3 +238,26 @@ export const generateVideoStreamFn = createServerFn({ method: 'POST' }) }), ) }) + +// ============================================================================= +// Chat server function (streams via SSE Response) +// Used with: stream((messages) => chatFn({ data: { messages } })) +// ============================================================================= + +export const chatFn = createServerFn({ method: 'POST' }) + .inputValidator( + (data: { messages: Array; data?: Record }) => data, + ) + .handler(({ data }) => + toServerSentEventsResponse( + chat({ + adapter: openaiText('gpt-4o'), + // chat() converts UIMessage[] to ModelMessage[] internally, but the + // exported type signature only accepts ModelMessage[] — cast to any. + messages: data.messages as any, + systemPrompts: [ + 'You are a helpful assistant. Keep replies short and friendly.', + ], + }), + ), + ) diff --git a/examples/ts-react-chat/src/routeTree.gen.ts b/examples/ts-react-chat/src/routeTree.gen.ts index 490145527..124b11e74 100644 --- a/examples/ts-react-chat/src/routeTree.gen.ts +++ b/examples/ts-react-chat/src/routeTree.gen.ts @@ -9,6 +9,7 @@ // Additionally, you should also exclude this file from your linter and/or formatter to prevent it from being checked or modified. import { Route as rootRouteImport } from './routes/__root' +import { Route as ServerFnChatRouteImport } from './routes/server-fn-chat' import { Route as RealtimeRouteImport } from './routes/realtime' import { Route as ImageGenRouteImport } from './routes/image-gen' import { Route as IndexRouteImport } from './routes/index' @@ -27,6 +28,11 @@ import { Route as ApiGenerateVideoRouteImport } from './routes/api.generate.vide import { Route as ApiGenerateSpeechRouteImport } from './routes/api.generate.speech' import { Route as ApiGenerateImageRouteImport } from './routes/api.generate.image' +const ServerFnChatRoute = ServerFnChatRouteImport.update({ + id: '/server-fn-chat', + path: '/server-fn-chat', + getParentRoute: () => rootRouteImport, +} as any) const RealtimeRoute = RealtimeRouteImport.update({ id: '/realtime', path: '/realtime', @@ -118,6 +124,7 @@ export interface FileRoutesByFullPath { '/': typeof IndexRoute '/image-gen': typeof ImageGenRoute '/realtime': typeof RealtimeRoute + '/server-fn-chat': typeof ServerFnChatRoute '/api/image-gen': typeof ApiImageGenRoute '/api/summarize': typeof ApiSummarizeRoute '/api/tanchat': typeof ApiTanchatRoute @@ -137,6 +144,7 @@ export interface FileRoutesByTo { '/': typeof IndexRoute '/image-gen': typeof ImageGenRoute '/realtime': typeof RealtimeRoute + '/server-fn-chat': typeof ServerFnChatRoute '/api/image-gen': typeof ApiImageGenRoute '/api/summarize': typeof ApiSummarizeRoute '/api/tanchat': typeof ApiTanchatRoute @@ -157,6 +165,7 @@ export interface FileRoutesById { '/': typeof IndexRoute '/image-gen': typeof ImageGenRoute '/realtime': typeof RealtimeRoute + '/server-fn-chat': typeof ServerFnChatRoute '/api/image-gen': typeof ApiImageGenRoute '/api/summarize': typeof ApiSummarizeRoute '/api/tanchat': typeof ApiTanchatRoute @@ -178,6 +187,7 @@ export interface FileRouteTypes { | '/' | '/image-gen' | '/realtime' + | '/server-fn-chat' | '/api/image-gen' | '/api/summarize' | '/api/tanchat' @@ -197,6 +207,7 @@ export interface FileRouteTypes { | '/' | '/image-gen' | '/realtime' + | '/server-fn-chat' | '/api/image-gen' | '/api/summarize' | '/api/tanchat' @@ -216,6 +227,7 @@ export interface FileRouteTypes { | '/' | '/image-gen' | '/realtime' + | '/server-fn-chat' | '/api/image-gen' | '/api/summarize' | '/api/tanchat' @@ -236,6 +248,7 @@ export interface RootRouteChildren { IndexRoute: typeof IndexRoute ImageGenRoute: typeof ImageGenRoute RealtimeRoute: typeof RealtimeRoute + ServerFnChatRoute: typeof ServerFnChatRoute ApiImageGenRoute: typeof ApiImageGenRoute ApiSummarizeRoute: typeof ApiSummarizeRoute ApiTanchatRoute: typeof ApiTanchatRoute @@ -254,6 +267,13 @@ export interface RootRouteChildren { declare module '@tanstack/react-router' { interface FileRoutesByPath { + '/server-fn-chat': { + id: '/server-fn-chat' + path: '/server-fn-chat' + fullPath: '/server-fn-chat' + preLoaderRoute: typeof ServerFnChatRouteImport + parentRoute: typeof rootRouteImport + } '/realtime': { id: '/realtime' path: '/realtime' @@ -380,6 +400,7 @@ const rootRouteChildren: RootRouteChildren = { IndexRoute: IndexRoute, ImageGenRoute: ImageGenRoute, RealtimeRoute: RealtimeRoute, + ServerFnChatRoute: ServerFnChatRoute, ApiImageGenRoute: ApiImageGenRoute, ApiSummarizeRoute: ApiSummarizeRoute, ApiTanchatRoute: ApiTanchatRoute, diff --git a/examples/ts-react-chat/src/routes/server-fn-chat.tsx b/examples/ts-react-chat/src/routes/server-fn-chat.tsx new file mode 100644 index 000000000..9d5ffd506 --- /dev/null +++ b/examples/ts-react-chat/src/routes/server-fn-chat.tsx @@ -0,0 +1,110 @@ +import { useState } from 'react' +import { createFileRoute } from '@tanstack/react-router' +import { stream, useChat } from '@tanstack/ai-react' +import { Send, Square } from 'lucide-react' +import { chatFn } from '@/lib/server-fns' +import type { UIMessage } from '@tanstack/ai' + +export const Route = createFileRoute('/server-fn-chat')({ + component: ServerFnChat, +}) + +/** + * Demonstrates wiring `useChat` to a TanStack Start server function. + * + * The server function (`chatFn` in `lib/server-fns.ts`) returns + * `toServerSentEventsResponse(chat({ ... }))` — an SSE Response. The + * `stream()` connection adapter awaits the server function, detects the + * Response, and parses SSE chunks into the chat client. + * + * Compare to `routes/index.tsx`, which uses `fetchServerSentEvents('/api/...')` + * against an HTTP route handler. Same wire format; different invocation style. + */ +function ServerFnChat() { + const { messages, sendMessage, isLoading, error, stop } = useChat({ + connection: stream((messages) => + chatFn({ data: { messages: messages as Array } }), + ), + }) + const [input, setInput] = useState('') + + const handleSubmit = (e: React.FormEvent) => { + e.preventDefault() + if (!input.trim() || isLoading) return + void sendMessage(input) + setInput('') + } + + return ( +
+
+

Chat via server function

+

+ stream(() => chatFn({ data })){' '} + — the server function returns an SSE{' '} + Response; the adapter parses it. +

+
+ +
+ {messages.length === 0 && ( +

+ Say something to start the chat. +

+ )} + {messages.map((m) => ( +
+ {m.parts.map((part, i) => + part.type === 'text' ? {part.content} : null, + )} +
+ ))} + {error && ( +
+ {error.message} +
+ )} +
+ +
+ setInput(e.target.value)} + placeholder="Message..." + disabled={isLoading} + className="flex-1 rounded-lg bg-gray-800 border border-gray-700 px-3 py-2 text-sm focus:outline-none focus:border-cyan-500" + /> + {isLoading ? ( + + ) : ( + + )} +
+
+ ) +} From c4961b118066533275e33ebb7c2282e1ee9d6de6 Mon Sep 17 00:00:00 2001 From: Tom Beckenham <34339192+tombeckenham@users.noreply.github.com> Date: Mon, 27 Apr 2026 18:05:49 +1000 Subject: [PATCH 03/10] docs(ai-client): reframe stream() server-function description Lead with what stream() does (typed RPC into useChat), instead of calling a server function "just a fancy/async API endpoint." Same edits applied to the changeset and the stream() JSDoc. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/stream-adapter-server-functions.md | 2 +- docs/chat/connection-adapters.md | 2 +- packages/typescript/ai-client/src/connection-adapters.ts | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.changeset/stream-adapter-server-functions.md b/.changeset/stream-adapter-server-functions.md index ae024ca65..cf7e6238b 100644 --- a/.changeset/stream-adapter-server-functions.md +++ b/.changeset/stream-adapter-server-functions.md @@ -4,7 +4,7 @@ feat(ai-client): support TanStack Start server functions in `stream()` connection adapter -The `stream()` factory now accepts any of three return shapes, so a server function (which is just an async API endpoint) can be wired directly into `useChat`: +The `stream()` factory now accepts any of three return shapes, so a TanStack Start server function can be wired directly into `useChat`: - `AsyncIterable` — direct in-process stream (existing behavior) - `Promise>` — server function returning the chat stream diff --git a/docs/chat/connection-adapters.md b/docs/chat/connection-adapters.md index 7dbe080b8..3ee35e116 100644 --- a/docs/chat/connection-adapters.md +++ b/docs/chat/connection-adapters.md @@ -74,7 +74,7 @@ const { messages } = useChat({ ### TanStack Start Server Functions -A server function is just a fancy API endpoint, so `stream()` adapts one into a `useChat` connection. The factory you pass to `stream()` may return either the chat `AsyncIterable` directly, or an SSE `Response` produced by `toServerSentEventsResponse()` — both shapes work because `stream()` awaits the result and unwraps a `Response` if it sees one. +`stream()` adapts a TanStack Start server function into a `useChat` connection so you get end-to-end type safety from the call site to the handler. The factory you pass to `stream()` may return either the chat `AsyncIterable` directly, or an SSE `Response` produced by `toServerSentEventsResponse()` — `stream()` awaits the result and unwraps a `Response` if it sees one. #### Returning an SSE Response (recommended) diff --git a/packages/typescript/ai-client/src/connection-adapters.ts b/packages/typescript/ai-client/src/connection-adapters.ts index 0ad8c338d..ec6e03ca3 100644 --- a/packages/typescript/ai-client/src/connection-adapters.ts +++ b/packages/typescript/ai-client/src/connection-adapters.ts @@ -453,9 +453,9 @@ export type StreamFactoryResult = * 3. A TanStack Start server function whose handler returns an SSE `Response` * via `toServerSentEventsResponse(stream)` (returns `Promise`). * - * Server functions are just async API endpoints — the third shape is the - * recommended pattern when you want to keep network bytes small (SSE) and - * still get end-to-end type safety from the server function call. + * The third shape is the recommended pattern when you want to keep network + * bytes small (SSE) while preserving end-to-end type safety from the server + * function call. * * @param streamFactory - Function called per request; returns one of the shapes above. * From fdbd45ae5bfe16203d8826150ba7a8852a999eab Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 27 Apr 2026 08:22:44 +0000 Subject: [PATCH 04/10] ci: apply automated fixes --- examples/ts-react-chat/src/routes/server-fn-chat.tsx | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/examples/ts-react-chat/src/routes/server-fn-chat.tsx b/examples/ts-react-chat/src/routes/server-fn-chat.tsx index 9d5ffd506..b2a7aeafa 100644 --- a/examples/ts-react-chat/src/routes/server-fn-chat.tsx +++ b/examples/ts-react-chat/src/routes/server-fn-chat.tsx @@ -40,9 +40,12 @@ function ServerFnChat() {

Chat via server function

- stream(() => chatFn({ data })){' '} + + stream(() => chatFn({ data })) + {' '} — the server function returns an SSE{' '} - Response; the adapter parses it. + Response; the adapter parses + it.

From ca99cc91a091a40e1f7a20d82b26412304017eda Mon Sep 17 00:00:00 2001 From: Tom Beckenham <34339192+tombeckenham@users.noreply.github.com> Date: Mon, 27 Apr 2026 18:27:27 +1000 Subject: [PATCH 05/10] fix: revert ai-fal package to upstream state The previous merge commit accidentally included stale references in @tanstack/ai-fal that this PR shouldn't have touched. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/typescript/ai-fal/package.json | 1 - packages/typescript/ai-fal/src/index.ts | 19 +------------------ 2 files changed, 1 insertion(+), 19 deletions(-) diff --git a/packages/typescript/ai-fal/package.json b/packages/typescript/ai-fal/package.json index 18023b322..097d3a4e0 100644 --- a/packages/typescript/ai-fal/package.json +++ b/packages/typescript/ai-fal/package.json @@ -25,7 +25,6 @@ "scripts": { "build": "vite build", "clean": "premove ./build ./dist", - "generate:model-registry": "tsx scripts/analyze-model-types.ts && tsx scripts/generate-registry.ts", "lint:fix": "eslint ./src --fix", "test:build": "publint --strict", "test:eslint": "eslint ./src", diff --git a/packages/typescript/ai-fal/src/index.ts b/packages/typescript/ai-fal/src/index.ts index 8cbdefb67..032227457 100644 --- a/packages/typescript/ai-fal/src/index.ts +++ b/packages/typescript/ai-fal/src/index.ts @@ -48,26 +48,9 @@ export { type FalModelVideoSize, } from './model-meta' // ============================================================================ -// Model Registry & Utils +// Utils // ============================================================================ -export { - IMAGE_MODEL_PATTERNS, - VIDEO_MODEL_PATTERNS, - getImageModelPattern, - getVideoModelPattern, - type ImageInputPattern, - type VideoInputPattern, -} from './model-registry' - -export { - convertToAspectRatio, - convertToResolutionPreset, - convertToNamedPreset, - parseWidthHeight, - formatDuration, -} from './utils/size-converters' - export { getFalApiKeyFromEnv, configureFalClient, From e0d0b9c7249a48dc5ef5a927cde5b8f736e4bad2 Mon Sep 17 00:00:00 2001 From: Tom Beckenham <34339192+tombeckenham@users.noreply.github.com> Date: Mon, 27 Apr 2026 19:09:22 +1000 Subject: [PATCH 06/10] fix(ai-client): use EventType enum in server-function stream tests Type-check was failing on CI because the new server-function and RPC async-iterable test fixtures yielded raw string literals for chunk type, which don't satisfy the EventType enum required by StreamChunk. Switch to the enum and add the required threadId to RUN_FINISHED. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ai-client/tests/connection-adapters.test.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/typescript/ai-client/tests/connection-adapters.test.ts b/packages/typescript/ai-client/tests/connection-adapters.test.ts index 0aff8b653..668b29e76 100644 --- a/packages/typescript/ai-client/tests/connection-adapters.test.ts +++ b/packages/typescript/ai-client/tests/connection-adapters.test.ts @@ -6,6 +6,7 @@ import { rpcStream, stream, } from '../src/connection-adapters' +import { EventType } from '@tanstack/ai' import type { StreamChunk } from '@tanstack/ai' /** Cast an event object to StreamChunk for type compatibility with EventType enum. */ @@ -844,7 +845,7 @@ describe('connection-adapters', () => { // async iterable directly: `createServerFn().handler(() => chat({...}))`. async function* serverStream(): AsyncGenerator { yield { - type: 'TEXT_MESSAGE_CONTENT', + type: EventType.TEXT_MESSAGE_CONTENT, messageId: 'msg-1', model: 'test', timestamp: Date.now(), @@ -852,7 +853,8 @@ describe('connection-adapters', () => { content: 'Hi', } yield { - type: 'RUN_FINISHED', + type: EventType.RUN_FINISHED, + threadId: 'thread-1', runId: 'run-1', model: 'test', timestamp: Date.now(), @@ -1113,7 +1115,7 @@ describe('connection-adapters', () => { it('should await Promise from RPC call', async () => { async function* rpcStreamGen(): AsyncGenerator { yield { - type: 'TEXT_MESSAGE_CONTENT', + type: EventType.TEXT_MESSAGE_CONTENT, messageId: 'msg-1', model: 'test', timestamp: Date.now(), From 67458d8a53ff942d6c6cecc712cb69866a1bca17 Mon Sep 17 00:00:00 2001 From: Tom Beckenham <34339192+tombeckenham@users.noreply.github.com> Date: Mon, 27 Apr 2026 19:36:20 +1000 Subject: [PATCH 07/10] fix(ai-client): drop unsafe casts, surface SSE parse errors, forward abortSignal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review findings on the server-function stream() adapter PR: - Synthesized RUN_FINISHED/RUN_ERROR events in normalizeConnectionAdapter no longer use `as unknown as StreamChunk`. Track threadId/runId from upstream chunks during iteration and reuse them; fall back to synthesized IDs only when no upstream chunk carried them. Use the EventType enum and typed RunFinishedEvent/RunErrorEvent so missing required fields are caught by the compiler instead of papered over. - Stop swallowing JSON.parse failures in parseSSEChunks and fetchHttpStream. A malformed mid-stream chunk is a protocol error; let it throw so it surfaces as RUN_ERROR via the connect-wrapper's catch path instead of silently dropping data behind a console.warn the user never sees. - Widen stream() and rpcStream() factory signatures with an optional abortSignal third arg and pass it through. Backwards-compatible — callers that ignore the third parameter are unaffected. Lets long-running server functions cancel in-flight work when useChat aborts. Tests updated to assert SyntaxError propagation rather than silent dropping on malformed JSON, and to expect the new third call argument on factory mocks. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ai-client/src/connection-adapters.ts | 69 +++++++++++-------- .../tests/connection-adapters.test.ts | 50 +++++--------- 2 files changed, 57 insertions(+), 62 deletions(-) diff --git a/packages/typescript/ai-client/src/connection-adapters.ts b/packages/typescript/ai-client/src/connection-adapters.ts index 2a02dac4d..f667a655f 100644 --- a/packages/typescript/ai-client/src/connection-adapters.ts +++ b/packages/typescript/ai-client/src/connection-adapters.ts @@ -1,4 +1,11 @@ -import type { ModelMessage, StreamChunk, UIMessage } from '@tanstack/ai' +import { EventType } from '@tanstack/ai' +import type { + ModelMessage, + RunErrorEvent, + RunFinishedEvent, + StreamChunk, + UIMessage, +} from '@tanstack/ai' /** * Merge custom headers into request headers @@ -22,6 +29,9 @@ function mergeHeaders( /** * Parse SSE-formatted lines into StreamChunks. * Handles both `data: {...}` lines and bare JSON lines, and skips `[DONE]`. + * + * Malformed JSON throws — a parse failure mid-stream is a protocol error and + * should surface as RUN_ERROR rather than silently dropping chunks. */ async function* parseSSEChunks( lines: AsyncIterable, @@ -34,11 +44,7 @@ async function* parseSSEChunks( ) continue } - try { - yield JSON.parse(data) as StreamChunk - } catch { - console.warn('Failed to parse SSE chunk:', data) - } + yield JSON.parse(data) as StreamChunk } } @@ -217,9 +223,17 @@ export function normalizeConnectionAdapter( }, async send(messages, data, abortSignal) { let hasTerminalEvent = false + let upstreamThreadId: string | undefined + let upstreamRunId: string | undefined try { const stream = connection.connect(messages, data, abortSignal) for await (const chunk of stream) { + if ('threadId' in chunk && typeof chunk.threadId === 'string') { + upstreamThreadId = chunk.threadId + } + if ('runId' in chunk && typeof chunk.runId === 'string') { + upstreamRunId = chunk.runId + } if (chunk.type === 'RUN_FINISHED' || chunk.type === 'RUN_ERROR') { hasTerminalEvent = true } @@ -229,28 +243,26 @@ export function normalizeConnectionAdapter( // If the connect stream ended cleanly without a terminal event, // synthesize RUN_FINISHED so request-scoped consumers can complete. if (!abortSignal?.aborted && !hasTerminalEvent) { - push({ - type: 'RUN_FINISHED', - runId: `run-${Date.now()}`, + const synthetic: RunFinishedEvent = { + type: EventType.RUN_FINISHED, + threadId: upstreamThreadId ?? `thread-${Date.now()}`, + runId: upstreamRunId ?? `run-${Date.now()}`, model: 'connect-wrapper', timestamp: Date.now(), finishReason: 'stop', - } as unknown as StreamChunk) + } + push(synthetic) } } catch (err) { if (!abortSignal?.aborted && !hasTerminalEvent) { - push({ - type: 'RUN_ERROR', + const message = + err instanceof Error ? err.message : 'Unknown error in connect()' + const synthetic: RunErrorEvent = { + type: EventType.RUN_ERROR, timestamp: Date.now(), - message: - err instanceof Error ? err.message : 'Unknown error in connect()', - error: { - message: - err instanceof Error - ? err.message - : 'Unknown error in connect()', - }, - } as unknown as StreamChunk) + message, + } + push(synthetic) } throw err } @@ -425,12 +437,7 @@ export function fetchHttpStream( } for await (const line of readStreamLines(reader, abortSignal)) { - try { - const parsed: StreamChunk = JSON.parse(line) - yield parsed - } catch (parseError) { - console.warn('Failed to parse HTTP stream chunk:', line) - } + yield JSON.parse(line) as StreamChunk } }, } @@ -492,11 +499,12 @@ export function stream( streamFactory: ( messages: Array | Array, data?: Record, + abortSignal?: AbortSignal, ) => StreamFactoryResult, ): ConnectConnectionAdapter { return { async *connect(messages, data, abortSignal) { - const result = await streamFactory(messages, data) + const result = await streamFactory(messages, data, abortSignal) if (result instanceof Response) { yield* responseToSSEChunks(result, abortSignal) } else { @@ -528,11 +536,12 @@ export function rpcStream( rpcCall: ( messages: Array | Array, data?: Record, + abortSignal?: AbortSignal, ) => AsyncIterable | Promise>, ): ConnectConnectionAdapter { return { - async *connect(messages, data) { - const iterable = await rpcCall(messages, data) + async *connect(messages, data, abortSignal) { + const iterable = await rpcCall(messages, data, abortSignal) yield* iterable }, } diff --git a/packages/typescript/ai-client/tests/connection-adapters.test.ts b/packages/typescript/ai-client/tests/connection-adapters.test.ts index 668b29e76..ec801974f 100644 --- a/packages/typescript/ai-client/tests/connection-adapters.test.ts +++ b/packages/typescript/ai-client/tests/connection-adapters.test.ts @@ -145,11 +145,7 @@ describe('connection-adapters', () => { warnSpy.mockRestore() }) - it('should handle malformed JSON gracefully', async () => { - const consoleWarnSpy = vi - .spyOn(console, 'warn') - .mockImplementation(() => {}) - + it('should throw on malformed SSE chunks rather than silently dropping them', async () => { const mockReader = { read: vi .fn() @@ -171,17 +167,13 @@ describe('connection-adapters', () => { fetchMock.mockResolvedValue(mockResponse as any) const adapter = fetchServerSentEvents('/api/chat') - const chunks: Array = [] - - for await (const chunk of adapter.connect([ - { role: 'user', content: 'Hello' }, - ])) { - chunks.push(chunk) - } - - expect(chunks).toHaveLength(0) - expect(consoleWarnSpy).toHaveBeenCalled() - consoleWarnSpy.mockRestore() + await expect(async () => { + for await (const _ of adapter.connect([ + { role: 'user', content: 'Hello' }, + ])) { + // drain + } + }).rejects.toThrow(SyntaxError) }) it('should handle HTTP errors', async () => { @@ -551,11 +543,7 @@ describe('connection-adapters', () => { expect(chunks).toHaveLength(1) }) - it('should handle malformed JSON gracefully', async () => { - const consoleWarnSpy = vi - .spyOn(console, 'warn') - .mockImplementation(() => {}) - + it('should throw on malformed JSON chunks rather than silently dropping them', async () => { const mockReader = { read: vi .fn() @@ -577,17 +565,13 @@ describe('connection-adapters', () => { fetchMock.mockResolvedValue(mockResponse as any) const adapter = fetchHttpStream('/api/chat') - const chunks: Array = [] - - for await (const chunk of adapter.connect([ - { role: 'user', content: 'Hello' }, - ])) { - chunks.push(chunk) - } - - expect(chunks).toHaveLength(0) - expect(consoleWarnSpy).toHaveBeenCalled() - consoleWarnSpy.mockRestore() + await expect(async () => { + for await (const _ of adapter.connect([ + { role: 'user', content: 'Hello' }, + ])) { + // drain + } + }).rejects.toThrow(SyntaxError) }) it('should handle HTTP errors', async () => { @@ -837,6 +821,7 @@ describe('connection-adapters', () => { expect(streamFactory).toHaveBeenCalledWith( expect.arrayContaining([expect.objectContaining({ role: 'user' })]), data, + undefined, ) }) @@ -1109,6 +1094,7 @@ describe('connection-adapters', () => { expect(rpcCall).toHaveBeenCalledWith( expect.arrayContaining([expect.objectContaining({ role: 'user' })]), data, + undefined, ) }) From 297085f823c98825f6ee1d407b2f595c1f38e837 Mon Sep 17 00:00:00 2001 From: Tom Beckenham <34339192+tombeckenham@users.noreply.github.com> Date: Tue, 28 Apr 2026 09:23:01 +1000 Subject: [PATCH 08/10] fix(ai-client): filter SSE non-payload fields and drop unterminated trailing buffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refine the SSE parser so the throw-on-parse-failure behavior doesn't regress on legitimate SSE traffic: - parseSSEChunks now skips SSE comment lines (`:`) and non-data fields (`event:`, `id:`, `retry:`) which proxies and CDNs commonly inject as keepalives. Previously these would have flowed into JSON.parse and thrown, killing otherwise-healthy streams behind any infrastructure that injects SSE control lines. - readStreamLines no longer yields the unterminated trailing buffer at stream end. A non-empty buffer means the connection was cut mid-line, so the content is partial by definition — yielding it would feed truncated JSON to the parser and surface a misleading RUN_ERROR for what is really a transport-layer issue. Warn and discard instead. Bare-line JSON (legacy/raw mode) is still accepted to preserve the existing public behavior covered by the `should handle SSE format without data: prefix` test. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ai-client/src/connection-adapters.ts | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/packages/typescript/ai-client/src/connection-adapters.ts b/packages/typescript/ai-client/src/connection-adapters.ts index f667a655f..8aa1048a8 100644 --- a/packages/typescript/ai-client/src/connection-adapters.ts +++ b/packages/typescript/ai-client/src/connection-adapters.ts @@ -28,15 +28,27 @@ function mergeHeaders( /** * Parse SSE-formatted lines into StreamChunks. - * Handles both `data: {...}` lines and bare JSON lines, and skips `[DONE]`. * - * Malformed JSON throws — a parse failure mid-stream is a protocol error and - * should surface as RUN_ERROR rather than silently dropping chunks. + * Accepts either `data: {...}` SSE lines or bare JSON lines (legacy/raw mode). + * Skips non-payload SSE fields (comments starting with `:`, and `event:` / + * `id:` / `retry:` lines) — proxies and CDNs may inject these as keepalives, + * and they are not malformed JSON. + * + * A JSON parse failure on an actual payload line throws (surfacing as + * RUN_ERROR through the connect-wrapper) rather than being silently dropped. */ async function* parseSSEChunks( lines: AsyncIterable, ): AsyncGenerator { for await (const line of lines) { + if ( + line.startsWith(':') || + line.startsWith('event:') || + line.startsWith('id:') || + line.startsWith('retry:') + ) { + continue + } const data = line.startsWith('data: ') ? line.slice(6) : line if (data === '[DONE]') { console.warn( @@ -101,9 +113,14 @@ async function* readStreamLines( } } - // Process any remaining data in the buffer + // Drop any unterminated trailing buffer. A non-empty buffer at stream end + // means the connection was cut mid-line (server crash, dropped TCP), so + // the content is by definition partial — yielding it would feed truncated + // JSON to downstream parsers and produce a confusing RUN_ERROR. if (buffer.trim()) { - yield buffer + console.warn( + '[@tanstack/ai-client] Stream ended with unterminated trailing data; discarding. The connection was likely cut short.', + ) } } finally { reader.releaseLock() From cb39b842aa3db6adf7d896b362391ab798d5daa3 Mon Sep 17 00:00:00 2001 From: Tom Beckenham <34339192+tombeckenham@users.noreply.github.com> Date: Tue, 28 Apr 2026 10:51:13 +1000 Subject: [PATCH 09/10] feat(ai-client,ai): narrow stream() callback to UIMessage[] and accept UIMessage[] in chat() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drops the `as any` / `as Array` casts previously needed when wiring useChat through a TanStack Start server function into chat(). The stream() factory now declares Array (with a runtime assert matching the ChatClient invariant), and chat()'s messages option accepts UIMessage[] directly alongside ConstrainedModelMessage[] — the runtime already normalised both via convertMessagesToModelMessages. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/stream-adapter-server-functions.md | 5 +++ examples/ts-react-chat/src/lib/server-fns.ts | 4 +- .../src/routes/server-fn-chat.tsx | 5 +-- .../ai-client/src/connection-adapters.ts | 28 +++++++++++- .../tests/connection-adapters.test.ts | 44 +++++++++---------- .../ai/src/activities/chat/index.ts | 27 +++++++----- 6 files changed, 69 insertions(+), 44 deletions(-) diff --git a/.changeset/stream-adapter-server-functions.md b/.changeset/stream-adapter-server-functions.md index cf7e6238b..9d4445765 100644 --- a/.changeset/stream-adapter-server-functions.md +++ b/.changeset/stream-adapter-server-functions.md @@ -1,5 +1,6 @@ --- '@tanstack/ai-client': minor +'@tanstack/ai': minor --- feat(ai-client): support TanStack Start server functions in `stream()` connection adapter @@ -21,3 +22,7 @@ const chatFn = createServerFn({ method: 'POST' }) useChat({ connection: stream((messages) => chatFn({ data: { messages } })) }) ``` + +The `stream()` callback's `messages` parameter is now typed as `Array` (was `Array | Array`) — matching what `useChat`/`ChatClient` actually sends. A runtime assertion guards against misuse. Existing callbacks typed against the union remain assignable (wider declared input satisfies narrower expected input). + +`chat()`'s `messages` option now also accepts `Array` directly, in addition to the existing `Array>`. The runtime already handled both via `convertMessagesToModelMessages`; the public type now matches. This eliminates the `as any` cast previously needed when forwarding `UIMessage[]` from a server-function validator into `chat()`. diff --git a/examples/ts-react-chat/src/lib/server-fns.ts b/examples/ts-react-chat/src/lib/server-fns.ts index 56307775a..12e46b6e3 100644 --- a/examples/ts-react-chat/src/lib/server-fns.ts +++ b/examples/ts-react-chat/src/lib/server-fns.ts @@ -386,9 +386,7 @@ export const chatFn = createServerFn({ method: 'POST' }) toServerSentEventsResponse( chat({ adapter: openaiText('gpt-4o'), - // chat() converts UIMessage[] to ModelMessage[] internally, but the - // exported type signature only accepts ModelMessage[] — cast to any. - messages: data.messages as any, + messages: data.messages, systemPrompts: [ 'You are a helpful assistant. Keep replies short and friendly.', ], diff --git a/examples/ts-react-chat/src/routes/server-fn-chat.tsx b/examples/ts-react-chat/src/routes/server-fn-chat.tsx index b2a7aeafa..f605fecc0 100644 --- a/examples/ts-react-chat/src/routes/server-fn-chat.tsx +++ b/examples/ts-react-chat/src/routes/server-fn-chat.tsx @@ -3,7 +3,6 @@ import { createFileRoute } from '@tanstack/react-router' import { stream, useChat } from '@tanstack/ai-react' import { Send, Square } from 'lucide-react' import { chatFn } from '@/lib/server-fns' -import type { UIMessage } from '@tanstack/ai' export const Route = createFileRoute('/server-fn-chat')({ component: ServerFnChat, @@ -22,9 +21,7 @@ export const Route = createFileRoute('/server-fn-chat')({ */ function ServerFnChat() { const { messages, sendMessage, isLoading, error, stop } = useChat({ - connection: stream((messages) => - chatFn({ data: { messages: messages as Array } }), - ), + connection: stream((messages) => chatFn({ data: { messages } })), }) const [input, setInput] = useState('') diff --git a/packages/typescript/ai-client/src/connection-adapters.ts b/packages/typescript/ai-client/src/connection-adapters.ts index 8aa1048a8..7f4d7561f 100644 --- a/packages/typescript/ai-client/src/connection-adapters.ts +++ b/packages/typescript/ai-client/src/connection-adapters.ts @@ -473,10 +473,33 @@ export type StreamFactoryResult = | AsyncIterable | Promise | Response> +/** + * Runtime invariant: `ChatClient` always passes `Array` to the + * connection adapter (see `chat-client.ts` — `processor.getMessages()` returns + * `Array`). We assert this so `stream()`'s callback can be typed + * with the narrower, more useful `Array` signature without an `as` + * cast — the asserts function narrows the union for the type checker. + */ +function assertUIMessages( + messages: Array | Array, +): asserts messages is Array { + const first = messages[0] + if (first === undefined) return + // UIMessage exposes `parts`; ModelMessage exposes `content`. + if (!('parts' in first)) { + throw new TypeError( + 'stream() expects UIMessage[]. Convert ModelMessage[] to UIMessage[] ' + + 'first (e.g. with modelMessagesToUIMessages from @tanstack/ai), or ' + + 'use rpcStream() for ModelMessage-shaped streams.', + ) + } +} + /** * Create a direct stream connection adapter. * - * Accepts any of: + * The factory callback receives `Array` — the message shape used by + * `useChat` and the chat client. Accepts any of: * * 1. An in-process async iterable factory (returns `AsyncIterable`). * 2. A TanStack Start server function whose handler returns the chat stream @@ -514,13 +537,14 @@ export type StreamFactoryResult = */ export function stream( streamFactory: ( - messages: Array | Array, + messages: Array, data?: Record, abortSignal?: AbortSignal, ) => StreamFactoryResult, ): ConnectConnectionAdapter { return { async *connect(messages, data, abortSignal) { + assertUIMessages(messages) const result = await streamFactory(messages, data, abortSignal) if (result instanceof Response) { yield* responseToSSEChunks(result, abortSignal) diff --git a/packages/typescript/ai-client/tests/connection-adapters.test.ts b/packages/typescript/ai-client/tests/connection-adapters.test.ts index ec801974f..9bf906eb1 100644 --- a/packages/typescript/ai-client/tests/connection-adapters.test.ts +++ b/packages/typescript/ai-client/tests/connection-adapters.test.ts @@ -7,12 +7,19 @@ import { stream, } from '../src/connection-adapters' import { EventType } from '@tanstack/ai' -import type { StreamChunk } from '@tanstack/ai' +import type { StreamChunk, UIMessage } from '@tanstack/ai' /** Cast an event object to StreamChunk for type compatibility with EventType enum. */ const asChunk = (chunk: Record) => chunk as unknown as StreamChunk +/** Build a minimal user UIMessage for tests that just need a non-empty input. */ +const userUIMessage = (content = 'Hello'): UIMessage => ({ + id: 'u1', + role: 'user', + parts: [{ type: 'text', content }], +}) + describe('connection-adapters', () => { let originalFetch: typeof fetch let fetchMock: ReturnType @@ -787,9 +794,7 @@ describe('connection-adapters', () => { const adapter = stream(streamFactory) const chunks: Array = [] - for await (const chunk of adapter.connect([ - { role: 'user', content: 'Hello' }, - ])) { + for await (const chunk of adapter.connect([userUIMessage()])) { chunks.push(chunk) } @@ -811,10 +816,7 @@ describe('connection-adapters', () => { const adapter = stream(streamFactory) const data = { key: 'value' } - for await (const _ of adapter.connect( - [{ role: 'user', content: 'Hello' }], - data, - )) { + for await (const _ of adapter.connect([userUIMessage()], data)) { // Consume } @@ -855,9 +857,7 @@ describe('connection-adapters', () => { const adapter = stream((messages, data) => serverFn(messages, data)) const chunks: Array = [] - for await (const chunk of adapter.connect([ - { role: 'user', content: 'Hello' }, - ])) { + for await (const chunk of adapter.connect([userUIMessage()])) { chunks.push(chunk) } @@ -879,9 +879,7 @@ describe('connection-adapters', () => { const adapter = stream(() => serverFn()) const chunks: Array = [] - for await (const chunk of adapter.connect([ - { role: 'user', content: 'Hello' }, - ])) { + for await (const chunk of adapter.connect([userUIMessage()])) { chunks.push(chunk) } @@ -901,9 +899,7 @@ describe('connection-adapters', () => { await expect( (async () => { - for await (const _ of adapter.connect([ - { role: 'user', content: 'Hello' }, - ])) { + for await (const _ of adapter.connect([userUIMessage()])) { // Consume } })(), @@ -968,7 +964,7 @@ describe('connection-adapters', () => { return received })() - await adapter.send([{ role: 'user', content: 'Hello' }]) + await adapter.send([userUIMessage()]) const received = await receivedPromise expect(received).toHaveLength(2) @@ -993,9 +989,9 @@ describe('connection-adapters', () => { return received })() - await expect( - adapter.send([{ role: 'user', content: 'Hello' }]), - ).rejects.toThrow('connect exploded') + await expect(adapter.send([userUIMessage()])).rejects.toThrow( + 'connect exploded', + ) const received = await receivedPromise expect(received).toHaveLength(1) @@ -1027,9 +1023,9 @@ describe('connection-adapters', () => { return received })() - await expect( - adapter.send([{ role: 'user', content: 'Hello' }]), - ).rejects.toThrow('connect exploded') + await expect(adapter.send([userUIMessage()])).rejects.toThrow( + 'connect exploded', + ) const received = await receivedPromise expect(received).toHaveLength(1) diff --git a/packages/typescript/ai/src/activities/chat/index.ts b/packages/typescript/ai/src/activities/chat/index.ts index e1327fdb5..01e733171 100644 --- a/packages/typescript/ai/src/activities/chat/index.ts +++ b/packages/typescript/ai/src/activities/chat/index.ts @@ -45,6 +45,7 @@ import type { ToolCallArgsEvent, ToolCallEndEvent, ToolCallStartEvent, + UIMessage, } from '../../types' import type { ChatMiddleware, @@ -82,12 +83,20 @@ export interface TextActivityOptions< > { /** The text adapter to use (created by a provider function like openaiText('gpt-4o')) */ adapter: TAdapter - /** Conversation messages - content types are constrained by the adapter's input modalities and metadata */ + /** + * Conversation messages. + * + * Accepts both `UIMessage` (parts-based, the shape used by `useChat` and the + * client store) and `ConstrainedModelMessage` (provider-shaped, content-type + * constrained by the adapter's input modalities and metadata). The runtime + * normalises both via `convertMessagesToModelMessages`. + */ messages?: Array< - ConstrainedModelMessage<{ - inputModalities: TAdapter['~types']['inputModalities'] - messageMetadataByModality: TAdapter['~types']['messageMetadataByModality'] - }> + | UIMessage + | ConstrainedModelMessage<{ + inputModalities: TAdapter['~types']['inputModalities'] + messageMetadataByModality: TAdapter['~types']['messageMetadataByModality'] + }> > /** System prompts to prepend to the conversation */ systemPrompts?: TextOptions['systemPrompts'] @@ -320,17 +329,13 @@ class TextEngine< // Extract client state (approvals, client tool results) from original messages BEFORE conversion // This preserves UIMessage parts data that would be lost during conversion to ModelMessage const { approvals, clientToolResults } = - this.extractClientStateFromOriginalMessages( - config.params.messages as Array, - ) + this.extractClientStateFromOriginalMessages(config.params.messages) this.initialApprovals = approvals this.initialClientToolResults = clientToolResults // Convert messages to ModelMessage format (handles both UIMessage and ModelMessage input) // This ensures consistent internal format regardless of what the client sends - this.messages = convertMessagesToModelMessages( - config.params.messages as Array, - ) + this.messages = convertMessagesToModelMessages(config.params.messages) // Initialize lazy tool manager after messages are converted (needs message history for scanning) this.lazyToolManager = new LazyToolManager( From e773e88fcb2025871d3bbd3e28ff2dd0e1b10ba4 Mon Sep 17 00:00:00 2001 From: Tom Beckenham <34339192+tombeckenham@users.noreply.github.com> Date: Tue, 28 Apr 2026 10:56:58 +1000 Subject: [PATCH 10/10] revert(ai): drop chat() messages type widening from this PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pulls the @tanstack/ai changes back out — the chat() messages-option widening to accept UIMessage[] is a separate concern from the stream() server-function feature this PR is about. Restores the example's `as any` cast with a comment, drops the @tanstack/ai minor bump from the changeset, and reverts chat/index.ts to its pre-PR state. Also bumps the example adapter to gpt-5.2. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/stream-adapter-server-functions.md | 3 --- examples/ts-react-chat/src/lib/server-fns.ts | 7 +++-- .../ai/src/activities/chat/index.ts | 27 ++++++++----------- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/.changeset/stream-adapter-server-functions.md b/.changeset/stream-adapter-server-functions.md index 9d4445765..6ca95b5b9 100644 --- a/.changeset/stream-adapter-server-functions.md +++ b/.changeset/stream-adapter-server-functions.md @@ -1,6 +1,5 @@ --- '@tanstack/ai-client': minor -'@tanstack/ai': minor --- feat(ai-client): support TanStack Start server functions in `stream()` connection adapter @@ -24,5 +23,3 @@ useChat({ connection: stream((messages) => chatFn({ data: { messages } })) }) ``` The `stream()` callback's `messages` parameter is now typed as `Array` (was `Array | Array`) — matching what `useChat`/`ChatClient` actually sends. A runtime assertion guards against misuse. Existing callbacks typed against the union remain assignable (wider declared input satisfies narrower expected input). - -`chat()`'s `messages` option now also accepts `Array` directly, in addition to the existing `Array>`. The runtime already handled both via `convertMessagesToModelMessages`; the public type now matches. This eliminates the `as any` cast previously needed when forwarding `UIMessage[]` from a server-function validator into `chat()`. diff --git a/examples/ts-react-chat/src/lib/server-fns.ts b/examples/ts-react-chat/src/lib/server-fns.ts index 12e46b6e3..d2ae04250 100644 --- a/examples/ts-react-chat/src/lib/server-fns.ts +++ b/examples/ts-react-chat/src/lib/server-fns.ts @@ -385,8 +385,11 @@ export const chatFn = createServerFn({ method: 'POST' }) .handler(({ data }) => toServerSentEventsResponse( chat({ - adapter: openaiText('gpt-4o'), - messages: data.messages, + adapter: openaiText('gpt-5.2'), + // chat()'s messages option is typed as ConstrainedModelMessage[], but the + // runtime accepts UIMessage[] too (normalised via convertMessagesToModelMessages). + // Cast to bridge the gap until the public type is widened in a separate PR. + messages: data.messages as any, systemPrompts: [ 'You are a helpful assistant. Keep replies short and friendly.', ], diff --git a/packages/typescript/ai/src/activities/chat/index.ts b/packages/typescript/ai/src/activities/chat/index.ts index 01e733171..e1327fdb5 100644 --- a/packages/typescript/ai/src/activities/chat/index.ts +++ b/packages/typescript/ai/src/activities/chat/index.ts @@ -45,7 +45,6 @@ import type { ToolCallArgsEvent, ToolCallEndEvent, ToolCallStartEvent, - UIMessage, } from '../../types' import type { ChatMiddleware, @@ -83,20 +82,12 @@ export interface TextActivityOptions< > { /** The text adapter to use (created by a provider function like openaiText('gpt-4o')) */ adapter: TAdapter - /** - * Conversation messages. - * - * Accepts both `UIMessage` (parts-based, the shape used by `useChat` and the - * client store) and `ConstrainedModelMessage` (provider-shaped, content-type - * constrained by the adapter's input modalities and metadata). The runtime - * normalises both via `convertMessagesToModelMessages`. - */ + /** Conversation messages - content types are constrained by the adapter's input modalities and metadata */ messages?: Array< - | UIMessage - | ConstrainedModelMessage<{ - inputModalities: TAdapter['~types']['inputModalities'] - messageMetadataByModality: TAdapter['~types']['messageMetadataByModality'] - }> + ConstrainedModelMessage<{ + inputModalities: TAdapter['~types']['inputModalities'] + messageMetadataByModality: TAdapter['~types']['messageMetadataByModality'] + }> > /** System prompts to prepend to the conversation */ systemPrompts?: TextOptions['systemPrompts'] @@ -329,13 +320,17 @@ class TextEngine< // Extract client state (approvals, client tool results) from original messages BEFORE conversion // This preserves UIMessage parts data that would be lost during conversion to ModelMessage const { approvals, clientToolResults } = - this.extractClientStateFromOriginalMessages(config.params.messages) + this.extractClientStateFromOriginalMessages( + config.params.messages as Array, + ) this.initialApprovals = approvals this.initialClientToolResults = clientToolResults // Convert messages to ModelMessage format (handles both UIMessage and ModelMessage input) // This ensures consistent internal format regardless of what the client sends - this.messages = convertMessagesToModelMessages(config.params.messages) + this.messages = convertMessagesToModelMessages( + config.params.messages as Array, + ) // Initialize lazy tool manager after messages are converted (needs message history for scanning) this.lazyToolManager = new LazyToolManager(