Skip to content

Commit d187180

Browse files
waleedlatif1claude
andcommitted
feat(copilot): add copilot_messages table with dual-write rollout
Splits copilot chat messages out of the copilot_chats.messages JSONB column into a dedicated copilot_messages table. JSONB stays canonical during R+0 — every write path dual-writes to the new table best-effort (try/catch + log warn, never throws). Migration 0213 creates the table + indexes and inline-backfills history from JSONB so OSS self-hosters don't need to run a separate script. Write paths covered: - post.ts (user message append) - terminal-state.ts (assistant turn finalize) - update-messages/route.ts (snapshot replace) - inbox/executor.ts (background turn) - fork/route.ts (chat clone) - superuser/import-workflow/route.ts (chat import) Each call threads chatModel + streamId where relevant; ON CONFLICT DO UPDATE preserves existing stream_id / model via COALESCE. For pre-R+1 reconciliation, run: bun apps/sim/scripts/copilot-messages-reconcile.ts [--since='7 days'] Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent a14d374 commit d187180

15 files changed

Lines changed: 17755 additions & 27 deletions

File tree

apps/sim/app/api/copilot/chat/update-messages/route.test.ts

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,25 @@ import { authMockFns } from '@sim/testing'
77
import { NextRequest } from 'next/server'
88
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
99

10-
const { mockSelect, mockFrom, mockWhere, mockLimit, mockUpdate, mockSet, mockUpdateWhere } =
11-
vi.hoisted(() => ({
12-
mockSelect: vi.fn(),
13-
mockFrom: vi.fn(),
14-
mockWhere: vi.fn(),
15-
mockLimit: vi.fn(),
16-
mockUpdate: vi.fn(),
17-
mockSet: vi.fn(),
18-
mockUpdateWhere: vi.fn(),
19-
}))
10+
const {
11+
mockSelect,
12+
mockFrom,
13+
mockWhere,
14+
mockLimit,
15+
mockUpdate,
16+
mockSet,
17+
mockUpdateWhere,
18+
mockReturning,
19+
} = vi.hoisted(() => ({
20+
mockSelect: vi.fn(),
21+
mockFrom: vi.fn(),
22+
mockWhere: vi.fn(),
23+
mockLimit: vi.fn(),
24+
mockUpdate: vi.fn(),
25+
mockSet: vi.fn(),
26+
mockUpdateWhere: vi.fn(),
27+
mockReturning: vi.fn(),
28+
}))
2029

2130
vi.mock('@sim/db', () => ({
2231
db: {
@@ -51,8 +60,9 @@ describe('Copilot Chat Update Messages API Route', () => {
5160
mockWhere.mockReturnValue({ limit: mockLimit })
5261
mockLimit.mockResolvedValue([])
5362
mockUpdate.mockReturnValue({ set: mockSet })
54-
mockUpdateWhere.mockResolvedValue(undefined)
5563
mockSet.mockReturnValue({ where: mockUpdateWhere })
64+
mockUpdateWhere.mockReturnValue({ returning: mockReturning })
65+
mockReturning.mockResolvedValue([{ model: 'gpt-4' }])
5666
})
5767

5868
afterEach(() => {

apps/sim/app/api/copilot/chat/update-messages/route.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { type NextRequest, NextResponse } from 'next/server'
66
import { updateCopilotMessagesContract } from '@/lib/api/contracts/copilot'
77
import { parseRequest } from '@/lib/api/server'
88
import { getAccessibleCopilotChatAuth } from '@/lib/copilot/chat/lifecycle'
9+
import { replaceCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write'
910
import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message'
1011
import {
1112
authenticateCopilotRequestSessionOnly,
@@ -86,7 +87,16 @@ export const POST = withRouteHandler(async (req: NextRequest) => {
8687
updateData.config = config
8788
}
8889

89-
await db.update(copilotChats).set(updateData).where(eq(copilotChats.id, chatId))
90+
const [updated] = await db
91+
.update(copilotChats)
92+
.set(updateData)
93+
.where(eq(copilotChats.id, chatId))
94+
.returning({ model: copilotChats.model })
95+
if (updated) {
96+
await replaceCopilotChatMessages(chatId, normalizedMessages, {
97+
chatModel: updated.model ?? null,
98+
})
99+
}
90100

91101
logger.info(`[${tracker.requestId}] Successfully updated chat`, {
92102
chatId,

apps/sim/app/api/mothership/chats/[chatId]/fork/route.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { eq } from 'drizzle-orm'
66
import { type NextRequest, NextResponse } from 'next/server'
77
import { forkMothershipChatContract } from '@/lib/api/contracts/mothership-tasks'
88
import { parseRequest } from '@/lib/api/server'
9+
import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write'
910
import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message'
1011
import { fetchGo } from '@/lib/copilot/request/go/fetch'
1112
import {
@@ -102,6 +103,8 @@ export const POST = withRouteHandler(
102103
return createInternalServerErrorResponse('Failed to create forked chat')
103104
}
104105

106+
await appendCopilotChatMessages(newId, forkedMessages, { chatModel: parent.model })
107+
105108
// Clone copilot-service conversation state (messages, active_messages, memory files).
106109
// Best-effort: if the copilot service doesn't have a row for the source chat yet, skip.
107110
try {

apps/sim/app/api/superuser/import-workflow/route.ts

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import { type NextRequest, NextResponse } from 'next/server'
77
import { importWorkflowAsSuperuserContract } from '@/lib/api/contracts/workflows'
88
import { parseRequest } from '@/lib/api/server'
99
import { getSession } from '@/lib/auth'
10+
import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write'
11+
import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message'
1012
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
1113
import { verifyEffectiveSuperUser } from '@/lib/templates/permissions'
1214
import { parseWorkflowJson } from '@/lib/workflows/operations/import-export'
@@ -172,19 +174,27 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
172174
let copilotChatsImported = 0
173175

174176
for (const chat of sourceCopilotChats) {
175-
await db.insert(copilotChats).values({
176-
userId: session.user.id,
177-
workflowId: newWorkflowId,
178-
title: chat.title ? `[Import] ${chat.title}` : null,
179-
messages: chat.messages,
180-
model: chat.model,
181-
conversationId: null, // Don't copy conversation ID
182-
previewYaml: chat.previewYaml,
183-
planArtifact: chat.planArtifact,
184-
config: chat.config,
185-
createdAt: new Date(),
186-
updatedAt: new Date(),
187-
})
177+
const [imported] = await db
178+
.insert(copilotChats)
179+
.values({
180+
userId: session.user.id,
181+
workflowId: newWorkflowId,
182+
title: chat.title ? `[Import] ${chat.title}` : null,
183+
messages: chat.messages,
184+
model: chat.model,
185+
conversationId: null, // Don't copy conversation ID
186+
previewYaml: chat.previewYaml,
187+
planArtifact: chat.planArtifact,
188+
config: chat.config,
189+
createdAt: new Date(),
190+
updatedAt: new Date(),
191+
})
192+
.returning({ id: copilotChats.id })
193+
if (imported && Array.isArray(chat.messages) && chat.messages.length > 0) {
194+
await appendCopilotChatMessages(imported.id, chat.messages as PersistedMessage[], {
195+
chatModel: chat.model,
196+
})
197+
}
188198
copilotChatsImported++
189199
}
190200

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { dbChainMock, dbChainMockFns, resetDbChainMock } from '@sim/testing'
5+
import { beforeEach, describe, expect, it, vi } from 'vitest'
6+
7+
vi.mock('@sim/db', () => dbChainMock)
8+
9+
import {
10+
appendCopilotChatMessages,
11+
replaceCopilotChatMessages,
12+
} from '@/lib/copilot/chat/messages-dual-write'
13+
import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message'
14+
15+
const userMsg: PersistedMessage = {
16+
id: 'msg-user-1',
17+
role: 'user',
18+
content: 'Hello',
19+
timestamp: '2026-01-01T00:00:00.000Z',
20+
}
21+
22+
const assistantMsg: PersistedMessage = {
23+
id: 'msg-asst-1',
24+
role: 'assistant',
25+
content: 'Hi back',
26+
timestamp: '2026-01-01T00:00:01.000Z',
27+
}
28+
29+
describe('messages-dual-write', () => {
30+
beforeEach(() => {
31+
vi.clearAllMocks()
32+
resetDbChainMock()
33+
})
34+
35+
describe('appendCopilotChatMessages', () => {
36+
it('is a no-op on empty array', async () => {
37+
await appendCopilotChatMessages('chat-1', [])
38+
expect(dbChainMockFns.insert).not.toHaveBeenCalled()
39+
})
40+
41+
it('inserts rows built from PersistedMessage shape', async () => {
42+
await appendCopilotChatMessages('chat-1', [userMsg, assistantMsg])
43+
44+
expect(dbChainMockFns.insert).toHaveBeenCalledTimes(1)
45+
expect(dbChainMockFns.values).toHaveBeenCalledTimes(1)
46+
const rows = dbChainMockFns.values.mock.calls[0][0]
47+
expect(rows).toHaveLength(2)
48+
49+
expect(rows[0]).toMatchObject({
50+
chatId: 'chat-1',
51+
messageId: 'msg-user-1',
52+
role: 'user',
53+
content: userMsg,
54+
model: null,
55+
streamId: null,
56+
})
57+
expect(rows[0].createdAt).toEqual(new Date(userMsg.timestamp))
58+
expect(rows[0].updatedAt).toEqual(new Date(userMsg.timestamp))
59+
60+
expect(rows[1]).toMatchObject({
61+
chatId: 'chat-1',
62+
messageId: 'msg-asst-1',
63+
role: 'assistant',
64+
content: assistantMsg,
65+
})
66+
expect(rows[1].createdAt).toEqual(new Date(assistantMsg.timestamp))
67+
})
68+
69+
it('preserves per-message ordering via timestamp', async () => {
70+
await appendCopilotChatMessages('chat-1', [userMsg, assistantMsg])
71+
const rows = dbChainMockFns.values.mock.calls[0][0]
72+
expect(rows[0].createdAt.getTime()).toBeLessThan(rows[1].createdAt.getTime())
73+
})
74+
75+
it('passes chatModel and streamId options to every row', async () => {
76+
await appendCopilotChatMessages('chat-1', [userMsg, assistantMsg], {
77+
chatModel: 'claude-sonnet-4-5',
78+
streamId: 'stream-xyz',
79+
})
80+
81+
const rows = dbChainMockFns.values.mock.calls[0][0]
82+
expect(rows[0].model).toBe('claude-sonnet-4-5')
83+
expect(rows[0].streamId).toBe('stream-xyz')
84+
expect(rows[1].model).toBe('claude-sonnet-4-5')
85+
expect(rows[1].streamId).toBe('stream-xyz')
86+
})
87+
88+
it('uses ON CONFLICT DO UPDATE with chat_id + message_id target', async () => {
89+
await appendCopilotChatMessages('chat-1', [userMsg])
90+
91+
expect(dbChainMockFns.onConflictDoUpdate).toHaveBeenCalledTimes(1)
92+
const conflictArg = dbChainMockFns.onConflictDoUpdate.mock.calls[0][0]
93+
expect(conflictArg.target).toHaveLength(2)
94+
expect(conflictArg.set).toHaveProperty('content')
95+
expect(conflictArg.set).toHaveProperty('role')
96+
expect(conflictArg.set).toHaveProperty('model')
97+
expect(conflictArg.set).toHaveProperty('streamId')
98+
expect(conflictArg.set).toHaveProperty('updatedAt')
99+
})
100+
101+
it('swallows DB errors so the legacy JSONB write stays canonical', async () => {
102+
dbChainMockFns.onConflictDoUpdate.mockRejectedValueOnce(new Error('connection lost'))
103+
104+
await expect(appendCopilotChatMessages('chat-1', [userMsg])).resolves.toBeUndefined()
105+
})
106+
})
107+
108+
describe('replaceCopilotChatMessages', () => {
109+
it('deletes all chat rows when given an empty snapshot', async () => {
110+
await replaceCopilotChatMessages('chat-1', [])
111+
112+
expect(dbChainMockFns.transaction).toHaveBeenCalledTimes(1)
113+
expect(dbChainMockFns.delete).toHaveBeenCalledTimes(1)
114+
expect(dbChainMockFns.insert).not.toHaveBeenCalled()
115+
})
116+
117+
it('deletes only rows whose message_id is not in the new snapshot, then upserts', async () => {
118+
await replaceCopilotChatMessages('chat-1', [userMsg, assistantMsg])
119+
120+
expect(dbChainMockFns.delete).toHaveBeenCalledTimes(1)
121+
expect(dbChainMockFns.insert).toHaveBeenCalledTimes(1)
122+
123+
const rows = dbChainMockFns.values.mock.calls[0][0]
124+
expect(rows).toHaveLength(2)
125+
expect(rows.map((r: { messageId: string }) => r.messageId)).toEqual([
126+
'msg-user-1',
127+
'msg-asst-1',
128+
])
129+
130+
expect(dbChainMockFns.onConflictDoUpdate).toHaveBeenCalledTimes(1)
131+
const conflictArg = dbChainMockFns.onConflictDoUpdate.mock.calls[0][0]
132+
expect(conflictArg.set).toHaveProperty('streamId')
133+
expect(conflictArg.set).toHaveProperty('model')
134+
})
135+
136+
it('passes chatModel to every row in the snapshot', async () => {
137+
await replaceCopilotChatMessages('chat-1', [userMsg], {
138+
chatModel: 'gpt-4o-mini',
139+
})
140+
141+
const rows = dbChainMockFns.values.mock.calls[0][0]
142+
expect(rows[0].model).toBe('gpt-4o-mini')
143+
})
144+
145+
it('swallows DB errors so the legacy JSONB write stays canonical', async () => {
146+
dbChainMockFns.transaction.mockRejectedValueOnce(new Error('tx aborted'))
147+
148+
await expect(replaceCopilotChatMessages('chat-1', [userMsg])).resolves.toBeUndefined()
149+
})
150+
})
151+
})

0 commit comments

Comments
 (0)