Skip to content

Commit 381bc1d

Browse files
committed
fix(concurrency): cleanup worker code
1 parent 20c0564 commit 381bc1d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+159
-5854
lines changed

README.md

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,6 @@ docker compose -f docker-compose.prod.yml up -d
7474

7575
Open [http://localhost:3000](http://localhost:3000)
7676

77-
#### Background worker note
78-
79-
The Docker Compose stack starts a dedicated worker container by default. If `REDIS_URL` is not configured, the worker will start, log that it is idle, and do no queue processing. This is expected. Queue-backed API, webhook, and schedule execution requires Redis; installs without Redis continue to use the inline execution path.
80-
8177
Sim also supports local models via [Ollama](https://ollama.ai) and [vLLM](https://docs.vllm.ai/) — see the [Docker self-hosting docs](https://docs.sim.ai/self-hosting/docker) for setup details.
8278

8379
### Self-hosted: Manual Setup
@@ -117,12 +113,10 @@ cd packages/db && bunx drizzle-kit migrate --config=./drizzle.config.ts
117113
5. Start development servers:
118114

119115
```bash
120-
bun run dev:full # Starts Next.js app, realtime socket server, and the BullMQ worker
116+
bun run dev:full # Starts Next.js app and realtime socket server
121117
```
122118

123-
If `REDIS_URL` is not configured, the worker will remain idle and execution continues inline.
124-
125-
Or run separately: `bun run dev` (Next.js), `cd apps/sim && bun run dev:sockets` (realtime), and `cd apps/sim && bun run worker` (BullMQ worker).
119+
Or run separately: `bun run dev` (Next.js) and `cd apps/sim && bun run dev:sockets` (realtime).
126120

127121
## Copilot API Keys
128122

apps/docs/content/docs/en/execution/costs.mdx

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -195,17 +195,6 @@ By default, your usage is capped at the credits included in your plan. To allow
195195

196196
Max (individual) shares the same rate limits as team plans. Team plans (Pro or Max for Teams) use the Max-tier rate limits.
197197

198-
### Concurrent Execution Limits
199-
200-
| Plan | Concurrent Executions |
201-
|------|----------------------|
202-
| **Free** | 5 |
203-
| **Pro** | 50 |
204-
| **Max / Team** | 200 |
205-
| **Enterprise** | 200 (customizable) |
206-
207-
Concurrent execution limits control how many workflow executions can run simultaneously within a workspace. When the limit is reached, new executions are queued and admitted as running executions complete. Manual runs from the editor are not subject to these limits.
208-
209198
### File Storage
210199

211200
| Plan | Storage |

apps/sim/app/api/jobs/[jobId]/route.test.ts

Lines changed: 55 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,13 @@
44
import type { NextRequest } from 'next/server'
55
import { beforeEach, describe, expect, it, vi } from 'vitest'
66

7-
const {
8-
mockCheckHybridAuth,
9-
mockGetDispatchJobRecord,
10-
mockGetJobQueue,
11-
mockVerifyWorkflowAccess,
12-
mockGetWorkflowById,
13-
} = vi.hoisted(() => ({
14-
mockCheckHybridAuth: vi.fn(),
15-
mockGetDispatchJobRecord: vi.fn(),
16-
mockGetJobQueue: vi.fn(),
17-
mockVerifyWorkflowAccess: vi.fn(),
18-
mockGetWorkflowById: vi.fn(),
19-
}))
7+
const { mockCheckHybridAuth, mockGetJobQueue, mockVerifyWorkflowAccess, mockGetWorkflowById } =
8+
vi.hoisted(() => ({
9+
mockCheckHybridAuth: vi.fn(),
10+
mockGetJobQueue: vi.fn(),
11+
mockVerifyWorkflowAccess: vi.fn(),
12+
mockGetWorkflowById: vi.fn(),
13+
}))
2014

2115
vi.mock('@sim/logger', () => ({
2216
createLogger: () => ({
@@ -32,19 +26,9 @@ vi.mock('@/lib/auth/hybrid', () => ({
3226
}))
3327

3428
vi.mock('@/lib/core/async-jobs', () => ({
35-
JOB_STATUS: {
36-
PENDING: 'pending',
37-
PROCESSING: 'processing',
38-
COMPLETED: 'completed',
39-
FAILED: 'failed',
40-
},
4129
getJobQueue: mockGetJobQueue,
4230
}))
4331

44-
vi.mock('@/lib/core/workspace-dispatch/store', () => ({
45-
getDispatchJobRecord: mockGetDispatchJobRecord,
46-
}))
47-
4832
vi.mock('@/lib/core/utils/request', () => ({
4933
generateRequestId: vi.fn().mockReturnValue('request-1'),
5034
}))
@@ -89,72 +73,78 @@ describe('GET /api/jobs/[jobId]', () => {
8973
})
9074
})
9175

92-
it('returns dispatcher-aware waiting status with metadata', async () => {
93-
mockGetDispatchJobRecord.mockResolvedValue({
94-
id: 'dispatch-1',
95-
workspaceId: 'workspace-1',
96-
lane: 'runtime',
97-
queueName: 'workflow-execution',
98-
bullmqJobName: 'workflow-execution',
99-
bullmqPayload: {},
100-
metadata: {
101-
workflowId: 'workflow-1',
102-
},
103-
priority: 10,
104-
status: 'waiting',
105-
createdAt: 1000,
106-
admittedAt: 2000,
76+
it('returns pending status for a queued job', async () => {
77+
mockGetJobQueue.mockResolvedValue({
78+
getJob: vi.fn().mockResolvedValue({
79+
id: 'job-1',
80+
type: 'workflow-execution',
81+
payload: {},
82+
status: 'pending',
83+
createdAt: new Date('2025-01-01T00:00:00Z'),
84+
attempts: 0,
85+
maxAttempts: 1,
86+
metadata: {
87+
workflowId: 'workflow-1',
88+
},
89+
}),
10790
})
10891

10992
const response = await GET(createMockRequest(), {
110-
params: Promise.resolve({ jobId: 'dispatch-1' }),
93+
params: Promise.resolve({ jobId: 'job-1' }),
11194
})
11295
const body = await response.json()
11396

11497
expect(response.status).toBe(200)
115-
expect(body.status).toBe('waiting')
116-
expect(body.metadata.queueName).toBe('workflow-execution')
117-
expect(body.metadata.lane).toBe('runtime')
118-
expect(body.metadata.workspaceId).toBe('workspace-1')
98+
expect(body.status).toBe('pending')
11999
})
120100

121-
it('returns completed output from dispatch state', async () => {
122-
mockGetDispatchJobRecord.mockResolvedValue({
123-
id: 'dispatch-2',
124-
workspaceId: 'workspace-1',
125-
lane: 'interactive',
126-
queueName: 'workflow-execution',
127-
bullmqJobName: 'direct-workflow-execution',
128-
bullmqPayload: {},
129-
metadata: {
130-
workflowId: 'workflow-1',
131-
},
132-
priority: 1,
133-
status: 'completed',
134-
createdAt: 1000,
135-
startedAt: 2000,
136-
completedAt: 7000,
137-
output: { success: true },
101+
it('returns completed output from job', async () => {
102+
mockGetJobQueue.mockResolvedValue({
103+
getJob: vi.fn().mockResolvedValue({
104+
id: 'job-2',
105+
type: 'workflow-execution',
106+
payload: {},
107+
status: 'completed',
108+
createdAt: new Date('2025-01-01T00:00:00Z'),
109+
startedAt: new Date('2025-01-01T00:00:01Z'),
110+
completedAt: new Date('2025-01-01T00:00:06Z'),
111+
attempts: 1,
112+
maxAttempts: 1,
113+
output: { success: true },
114+
metadata: {
115+
workflowId: 'workflow-1',
116+
},
117+
}),
138118
})
139119

140120
const response = await GET(createMockRequest(), {
141-
params: Promise.resolve({ jobId: 'dispatch-2' }),
121+
params: Promise.resolve({ jobId: 'job-2' }),
142122
})
143123
const body = await response.json()
144124

145125
expect(response.status).toBe(200)
146126
expect(body.status).toBe('completed')
147127
expect(body.output).toEqual({ success: true })
148-
expect(body.metadata.duration).toBe(5000)
149128
})
150129

151-
it('returns 404 when neither dispatch nor BullMQ job exists', async () => {
152-
mockGetDispatchJobRecord.mockResolvedValue(null)
153-
130+
it('returns 404 when job does not exist', async () => {
154131
const response = await GET(createMockRequest(), {
155132
params: Promise.resolve({ jobId: 'missing-job' }),
156133
})
157134

158135
expect(response.status).toBe(404)
159136
})
137+
138+
it('returns 401 for unauthenticated requests', async () => {
139+
mockCheckHybridAuth.mockResolvedValue({
140+
success: false,
141+
error: 'Not authenticated',
142+
})
143+
144+
const response = await GET(createMockRequest(), {
145+
params: Promise.resolve({ jobId: 'job-1' }),
146+
})
147+
148+
expect(response.status).toBe(401)
149+
})
160150
})

apps/sim/app/api/jobs/[jobId]/route.ts

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,27 @@ import { createLogger } from '@sim/logger'
22
import { type NextRequest, NextResponse } from 'next/server'
33
import { checkHybridAuth } from '@/lib/auth/hybrid'
44
import { getJobQueue } from '@/lib/core/async-jobs'
5+
import type { Job } from '@/lib/core/async-jobs/types'
56
import { generateRequestId } from '@/lib/core/utils/request'
6-
import { presentDispatchOrJobStatus } from '@/lib/core/workspace-dispatch/status'
7-
import { getDispatchJobRecord } from '@/lib/core/workspace-dispatch/store'
87
import { createErrorResponse } from '@/app/api/workflows/utils'
98

109
const logger = createLogger('TaskStatusAPI')
1110

11+
function presentJobStatus(job: Job) {
12+
return {
13+
status: job.status,
14+
metadata: {
15+
createdAt: job.createdAt.toISOString(),
16+
startedAt: job.startedAt?.toISOString(),
17+
completedAt: job.completedAt?.toISOString(),
18+
attempts: job.attempts,
19+
maxAttempts: job.maxAttempts,
20+
},
21+
output: job.output,
22+
error: job.error,
23+
}
24+
}
25+
1226
export async function GET(
1327
request: NextRequest,
1428
{ params }: { params: Promise<{ jobId: string }> }
@@ -25,15 +39,14 @@ export async function GET(
2539

2640
const authenticatedUserId = authResult.userId
2741

28-
const dispatchJob = await getDispatchJobRecord(taskId)
2942
const jobQueue = await getJobQueue()
30-
const job = dispatchJob ? null : await jobQueue.getJob(taskId)
43+
const job = await jobQueue.getJob(taskId)
3144

32-
if (!job && !dispatchJob) {
45+
if (!job) {
3346
return createErrorResponse('Task not found', 404)
3447
}
3548

36-
const metadataToCheck = dispatchJob?.metadata ?? job?.metadata
49+
const metadataToCheck = job.metadata
3750

3851
if (metadataToCheck?.workflowId) {
3952
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
@@ -61,7 +74,7 @@ export async function GET(
6174
return createErrorResponse('Access denied', 403)
6275
}
6376

64-
const presented = presentDispatchOrJobStatus(dispatchJob, job)
77+
const presented = presentJobStatus(job)
6578
const response: any = {
6679
success: true,
6780
taskId,
@@ -71,9 +84,6 @@ export async function GET(
7184

7285
if (presented.output !== undefined) response.output = presented.output
7386
if (presented.error !== undefined) response.error = presented.error
74-
if (presented.estimatedDuration !== undefined) {
75-
response.estimatedDuration = presented.estimatedDuration
76-
}
7787

7888
return NextResponse.json(response)
7989
} catch (error: any) {

apps/sim/app/api/schedules/execute/route.test.ts

Lines changed: 16 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ const {
1414
mockDbReturning,
1515
mockDbUpdate,
1616
mockEnqueue,
17-
mockEnqueueWorkspaceDispatch,
1817
mockStartJob,
1918
mockCompleteJob,
2019
mockMarkJobFailed,
@@ -24,7 +23,6 @@ const {
2423
const mockDbSet = vi.fn().mockReturnValue({ where: mockDbWhere })
2524
const mockDbUpdate = vi.fn().mockReturnValue({ set: mockDbSet })
2625
const mockEnqueue = vi.fn().mockResolvedValue('job-id-1')
27-
const mockEnqueueWorkspaceDispatch = vi.fn().mockResolvedValue('job-id-1')
2826
const mockStartJob = vi.fn().mockResolvedValue(undefined)
2927
const mockCompleteJob = vi.fn().mockResolvedValue(undefined)
3028
const mockMarkJobFailed = vi.fn().mockResolvedValue(undefined)
@@ -42,7 +40,6 @@ const {
4240
mockDbReturning,
4341
mockDbUpdate,
4442
mockEnqueue,
45-
mockEnqueueWorkspaceDispatch,
4643
mockStartJob,
4744
mockCompleteJob,
4845
mockMarkJobFailed,
@@ -75,15 +72,6 @@ vi.mock('@/lib/core/async-jobs', () => ({
7572
shouldExecuteInline: vi.fn().mockReturnValue(false),
7673
}))
7774

78-
vi.mock('@/lib/core/bullmq', () => ({
79-
isBullMQEnabled: vi.fn().mockReturnValue(true),
80-
createBullMQJobData: vi.fn((payload: unknown) => ({ payload })),
81-
}))
82-
83-
vi.mock('@/lib/core/workspace-dispatch', () => ({
84-
enqueueWorkspaceDispatch: mockEnqueueWorkspaceDispatch,
85-
}))
86-
8775
vi.mock('@/lib/workflows/utils', () => ({
8876
getWorkflowById: vi.fn().mockResolvedValue({
8977
id: 'workflow-1',
@@ -246,29 +234,19 @@ describe('Scheduled Workflow Execution API Route', () => {
246234
expect(data).toHaveProperty('executedCount', 2)
247235
})
248236

249-
it('should queue mothership jobs to BullMQ when available', async () => {
237+
it('should execute mothership jobs inline', async () => {
250238
mockDbReturning.mockReturnValueOnce([]).mockReturnValueOnce(SINGLE_JOB)
251239

252240
const response = await GET(createMockRequest())
253241

254242
expect(response.status).toBe(200)
255-
expect(mockEnqueueWorkspaceDispatch).toHaveBeenCalledWith(
243+
expect(mockExecuteJobInline).toHaveBeenCalledWith(
256244
expect.objectContaining({
257-
workspaceId: 'workspace-1',
258-
lane: 'runtime',
259-
queueName: 'mothership-job-execution',
260-
bullmqJobName: 'mothership-job-execution',
261-
bullmqPayload: {
262-
payload: {
263-
scheduleId: 'job-1',
264-
cronExpression: '0 * * * *',
265-
failedCount: 0,
266-
now: expect.any(String),
267-
},
268-
},
245+
scheduleId: 'job-1',
246+
cronExpression: '0 * * * *',
247+
failedCount: 0,
269248
})
270249
)
271-
expect(mockExecuteJobInline).not.toHaveBeenCalled()
272250
})
273251

274252
it('should enqueue preassigned correlation metadata for schedules', async () => {
@@ -277,25 +255,23 @@ describe('Scheduled Workflow Execution API Route', () => {
277255
const response = await GET(createMockRequest())
278256

279257
expect(response.status).toBe(200)
280-
expect(mockEnqueueWorkspaceDispatch).toHaveBeenCalledWith(
258+
expect(mockEnqueue).toHaveBeenCalledWith(
259+
'schedule-execution',
260+
expect.objectContaining({
261+
scheduleId: 'schedule-1',
262+
workflowId: 'workflow-1',
263+
executionId: 'schedule-execution-1',
264+
}),
281265
expect.objectContaining({
282-
id: 'schedule-execution-1',
283-
workspaceId: 'workspace-1',
284-
lane: 'runtime',
285-
queueName: 'schedule-execution',
286-
bullmqJobName: 'schedule-execution',
287-
metadata: {
266+
metadata: expect.objectContaining({
288267
workflowId: 'workflow-1',
289-
correlation: {
268+
correlation: expect.objectContaining({
290269
executionId: 'schedule-execution-1',
291270
requestId: 'test-request-id',
292271
source: 'schedule',
293272
workflowId: 'workflow-1',
294-
scheduleId: 'schedule-1',
295-
triggerType: 'schedule',
296-
scheduledFor: '2025-01-01T00:00:00.000Z',
297-
},
298-
},
273+
}),
274+
}),
299275
})
300276
)
301277
})

0 commit comments

Comments
 (0)