From 0365b3b03e37901cf99ac961225a9b61760feb1c Mon Sep 17 00:00:00 2001 From: Himanshu Soni Date: Sat, 31 Jan 2026 01:55:16 +0530 Subject: [PATCH] fix(executor): prevent race condition in pause persistence --- .../executor/human-in-the-loop-manager.ts | 52 +++++----- .../pause-resume-race-condition.test.ts | 94 +++++++++++++++++++ 2 files changed, 122 insertions(+), 24 deletions(-) create mode 100644 apps/sim/lib/workflows/executor/pause-resume-race-condition.test.ts diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 479ead99ac..ee1a405e23 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -121,28 +121,14 @@ export class PauseResumeManager { const now = new Date() - await db - .insert(pausedExecutions) - .values({ - id: randomUUID(), - workflowId, - executionId, - executionSnapshot: snapshotSeed, - pausePoints: pausePointsRecord, - totalPauseCount: pausePoints.length, - resumedCount: 0, - status: 'paused', - metadata: { - pauseScope: 'execution', - triggerIds: snapshotSeed.triggerIds, - executorUserId: executorUserId ?? null, - }, - pausedAt: now, - updatedAt: now, - }) - .onConflictDoUpdate({ - target: pausedExecutions.executionId, - set: { + // Wrap persistence in a transaction to prevent race conditions with concurrent resume requests + await db.transaction(async (tx) => { + await tx + .insert(pausedExecutions) + .values({ + id: randomUUID(), + workflowId, + executionId, executionSnapshot: snapshotSeed, pausePoints: pausePointsRecord, totalPauseCount: pausePoints.length, @@ -153,10 +139,28 @@ export class PauseResumeManager { triggerIds: snapshotSeed.triggerIds, executorUserId: executorUserId ?? null, }, + pausedAt: now, updatedAt: now, - }, - }) + }) + .onConflictDoUpdate({ + target: pausedExecutions.executionId, + set: { + executionSnapshot: snapshotSeed, + pausePoints: pausePointsRecord, + totalPauseCount: pausePoints.length, + resumedCount: 0, + status: 'paused', + metadata: { + pauseScope: 'execution', + triggerIds: snapshotSeed.triggerIds, + executorUserId: executorUserId ?? null, + }, + updatedAt: now, + }, + }) + }) + // Process queued resumes after transaction commits to ensure visibility await PauseResumeManager.processQueuedResumes(executionId) } diff --git a/apps/sim/lib/workflows/executor/pause-resume-race-condition.test.ts b/apps/sim/lib/workflows/executor/pause-resume-race-condition.test.ts new file mode 100644 index 0000000000..57b466b14d --- /dev/null +++ b/apps/sim/lib/workflows/executor/pause-resume-race-condition.test.ts @@ -0,0 +1,94 @@ +/** + * @vitest-environment node + * + * Tests for Issue #3081: Race Condition between pause persistence and resume requests + */ +import { databaseMock, loggerMock } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +vi.mock('@sim/db', () => databaseMock) +vi.mock('@sim/logger', () => loggerMock) + +import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager' +import type { PausePoint, SerializedSnapshot } from '@/executor/types' + +describe('PauseResumeManager - Race Condition Fix (#3081)', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + const createTestSnapshot = (): SerializedSnapshot => ({ + snapshot: JSON.stringify({ + workflow: { blocks: [], connections: [] }, + state: { blockStates: {}, executedBlocks: [] }, + }), + triggerIds: [], + }) + + const createTestPausePoints = (): PausePoint[] => [ + { + contextId: 'test-context', + blockId: 'pause-block-1', + response: {}, + resumeStatus: 'paused', + snapshotReady: true, + registeredAt: new Date().toISOString(), + }, + ] + + describe('persistPauseResult', () => { + it.concurrent('should use database transaction for atomic persistence', async () => { + const mockInsert = vi.fn().mockReturnValue({ + values: vi.fn().mockReturnValue({ + onConflictDoUpdate: vi.fn().mockResolvedValue(undefined), + }), + }) + + const mockTransaction = vi.fn().mockImplementation(async (callback) => { + const mockTx = { insert: mockInsert } + return await callback(mockTx as any) + }) + + vi.mocked(databaseMock.db.transaction).mockImplementation(mockTransaction) + vi.spyOn(PauseResumeManager, 'processQueuedResumes').mockResolvedValue(undefined) + + await PauseResumeManager.persistPauseResult({ + workflowId: 'test-workflow', + executionId: 'test-execution', + pausePoints: createTestPausePoints(), + snapshotSeed: createTestSnapshot(), + executorUserId: 'test-user', + }) + + expect(mockTransaction).toHaveBeenCalledTimes(1) + expect(mockInsert).toHaveBeenCalled() + }) + + it.concurrent('should call processQueuedResumes after transaction', async () => { + const mockInsert = vi.fn().mockReturnValue({ + values: vi.fn().mockReturnValue({ + onConflictDoUpdate: vi.fn().mockResolvedValue(undefined), + }), + }) + + vi.mocked(databaseMock.db.transaction).mockImplementation(async (callback) => { + const mockTx = { insert: mockInsert } + return await callback(mockTx as any) + }) + + const processQueuedResumesSpy = vi + .spyOn(PauseResumeManager, 'processQueuedResumes') + .mockResolvedValue(undefined) + + await PauseResumeManager.persistPauseResult({ + workflowId: 'test-workflow', + executionId: 'test-execution', + pausePoints: createTestPausePoints(), + snapshotSeed: createTestSnapshot(), + executorUserId: 'test-user', + }) + + expect(processQueuedResumesSpy).toHaveBeenCalledWith('test-execution') + }) + }) +})