From 62558381f73acf173a138f276140626be686d9cc Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Fri, 3 Apr 2026 17:38:31 -0700 Subject: [PATCH 1/3] improvement(mothership): workflow edits via sockets --- .../workspace/providers/socket-provider.tsx | 14 ++++ apps/sim/hooks/use-collaborative-workflow.ts | 64 +++++++++++++++++++ .../server/workflow/edit-workflow/index.ts | 15 +++++ 3 files changed, 93 insertions(+) diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index 6a27bd3a664..26674bb2e9d 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -90,6 +90,7 @@ interface SocketContextType { onSelectionUpdate: (handler: (data: any) => void) => void onWorkflowDeleted: (handler: (data: any) => void) => void onWorkflowReverted: (handler: (data: any) => void) => void + onWorkflowUpdated: (handler: (data: any) => void) => void onOperationConfirmed: (handler: (data: any) => void) => void onOperationFailed: (handler: (data: any) => void) => void } @@ -118,6 +119,7 @@ const SocketContext = createContext({ onSelectionUpdate: () => {}, onWorkflowDeleted: () => {}, onWorkflowReverted: () => {}, + onWorkflowUpdated: () => {}, onOperationConfirmed: () => {}, onOperationFailed: () => {}, }) @@ -155,6 +157,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { selectionUpdate?: (data: any) => void workflowDeleted?: (data: any) => void workflowReverted?: (data: any) => void + workflowUpdated?: (data: any) => void operationConfirmed?: (data: any) => void operationFailed?: (data: any) => void }>({}) @@ -382,6 +385,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) { eventHandlers.current.workflowReverted?.(data) }) + socketInstance.on('workflow-updated', (data) => { + logger.info(`Workflow ${data.workflowId} has been updated externally`) + eventHandlers.current.workflowUpdated?.(data) + }) + const rehydrateWorkflowStores = async (workflowId: string, workflowState: any) => { const [ { useOperationQueueStore }, @@ -804,6 +812,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) { eventHandlers.current.workflowReverted = handler }, []) + const onWorkflowUpdated = useCallback((handler: (data: any) => void) => { + eventHandlers.current.workflowUpdated = handler + }, []) + const onOperationConfirmed = useCallback((handler: (data: any) => void) => { eventHandlers.current.operationConfirmed = handler }, []) @@ -837,6 +849,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { onSelectionUpdate, onWorkflowDeleted, onWorkflowReverted, + onWorkflowUpdated, onOperationConfirmed, onOperationFailed, }), @@ -864,6 +877,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { onSelectionUpdate, onWorkflowDeleted, onWorkflowReverted, + onWorkflowUpdated, onOperationConfirmed, onOperationFailed, ] diff --git a/apps/sim/hooks/use-collaborative-workflow.ts b/apps/sim/hooks/use-collaborative-workflow.ts index 4093bed8b20..1c888a84209 100644 --- a/apps/sim/hooks/use-collaborative-workflow.ts +++ b/apps/sim/hooks/use-collaborative-workflow.ts @@ -122,6 +122,7 @@ export function useCollaborativeWorkflow() { onVariableUpdate, onWorkflowDeleted, onWorkflowReverted, + onWorkflowUpdated, onOperationConfirmed, onOperationFailed, } = useSocket() @@ -615,6 +616,67 @@ export function useCollaborativeWorkflow() { } } + const handleWorkflowUpdated = async (data: any) => { + const { workflowId } = data + logger.info(`Workflow ${workflowId} has been updated externally`) + + if (activeWorkflowId !== workflowId) return + + const { hasActiveDiff } = useWorkflowDiffStore.getState() + if (hasActiveDiff) { + logger.info('Skipping workflow-updated: active diff in progress', { workflowId }) + return + } + + try { + const response = await fetch(`/api/workflows/${workflowId}`) + if (response.ok) { + const responseData = await response.json() + const workflowData = responseData.data + + if (workflowData?.state) { + isApplyingRemoteChange.current = true + try { + useWorkflowStore.getState().replaceWorkflowState({ + blocks: workflowData.state.blocks || {}, + edges: workflowData.state.edges || [], + loops: workflowData.state.loops || {}, + parallels: workflowData.state.parallels || {}, + lastSaved: workflowData.state.lastSaved || Date.now(), + deploymentStatuses: workflowData.state.deploymentStatuses || {}, + }) + + const subblockValues: Record> = {} + Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => { + const blockState = block as any + subblockValues[blockId] = {} + Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => { + subblockValues[blockId][subblockId] = (subblock as any).value + }) + }) + + useSubBlockStore.setState((state: any) => ({ + workflowValues: { + ...state.workflowValues, + [workflowId]: subblockValues, + }, + })) + + logger.info(`Successfully applied externally updated workflow state`, { workflowId }) + } finally { + isApplyingRemoteChange.current = false + } + } + } else { + logger.error( + `Failed to fetch workflow data after external update: ${response.statusText}` + ) + } + } catch (error) { + logger.error('Error reloading workflow state after external update:', error) + } + } + const handleOperationConfirmed = (data: any) => { const { operationId } = data logger.debug('Operation confirmed', { operationId }) @@ -633,6 +695,7 @@ export function useCollaborativeWorkflow() { onVariableUpdate(handleVariableUpdate) onWorkflowDeleted(handleWorkflowDeleted) onWorkflowReverted(handleWorkflowReverted) + onWorkflowUpdated(handleWorkflowUpdated) onOperationConfirmed(handleOperationConfirmed) onOperationFailed(handleOperationFailed) }, [ @@ -641,6 +704,7 @@ export function useCollaborativeWorkflow() { onVariableUpdate, onWorkflowDeleted, onWorkflowReverted, + onWorkflowUpdated, onOperationConfirmed, onOperationFailed, activeWorkflowId, diff --git a/apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts b/apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts index eb0a0f23ed6..e1824f07a28 100644 --- a/apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts +++ b/apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts @@ -7,6 +7,7 @@ import { type BaseServerTool, type ServerToolContext, } from '@/lib/copilot/tools/server/base-tool' +import { env } from '@/lib/core/config/env' import { applyTargetedLayout, getTargetedLayoutImpact } from '@/lib/workflows/autolayout' import { DEFAULT_HORIZONTAL_SPACING, @@ -287,6 +288,20 @@ export const editWorkflowServerTool: BaseServerTool logger.info('Workflow state persisted to database', { workflowId }) + try { + const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002' + await fetch(`${socketUrl}/api/workflow-updated`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-api-key': env.INTERNAL_API_SECRET, + }, + body: JSON.stringify({ workflowId }), + }) + } catch (error) { + logger.warn('Failed to notify socket server of workflow update', { workflowId, error }) + } + const sanitizationWarnings = validation.warnings.length > 0 ? validation.warnings : undefined return { From 79476dd8f5a491ec7104964f6873b98e32c99136 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Fri, 3 Apr 2026 18:27:50 -0700 Subject: [PATCH 2/3] make embedded view join room --- .../[workspaceId]/w/[workflowId]/workflow.tsx | 10 +- .../workspace/providers/socket-provider.tsx | 2 +- apps/sim/hooks/use-collaborative-workflow.ts | 181 +++++++----------- .../server/workflow/edit-workflow/index.ts | 22 +-- 4 files changed, 89 insertions(+), 126 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx index 57fd7a41706..5e69dfc53c2 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx @@ -265,7 +265,7 @@ const WorkflowContent = React.memo( const { fitViewToBounds, getViewportCenter } = useCanvasViewport(reactFlowInstance, { embedded, }) - const { emitCursorUpdate } = useSocket() + const { emitCursorUpdate, joinWorkflow, leaveWorkflow } = useSocket() useDynamicHandleRefresh() const workspaceId = propWorkspaceId || (params.workspaceId as string) @@ -273,6 +273,14 @@ const WorkflowContent = React.memo( const addNotification = useNotificationStore((state) => state.addNotification) + useEffect(() => { + if (!embedded || !workflowIdParam) return + joinWorkflow(workflowIdParam) + return () => { + leaveWorkflow() + } + }, [embedded, workflowIdParam, joinWorkflow, leaveWorkflow]) + useOAuthReturnForWorkflow(workflowIdParam) const { diff --git a/apps/sim/app/workspace/providers/socket-provider.tsx b/apps/sim/app/workspace/providers/socket-provider.tsx index 26674bb2e9d..cb1c090eb00 100644 --- a/apps/sim/app/workspace/providers/socket-provider.tsx +++ b/apps/sim/app/workspace/providers/socket-provider.tsx @@ -337,7 +337,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) { socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => { isRejoiningRef.current = false // Ignore stale success responses from previous navigation - if (workflowId !== urlWorkflowIdRef.current) { + if (urlWorkflowIdRef.current && workflowId !== urlWorkflowIdRef.current) { logger.debug(`Ignoring stale join-workflow-success for ${workflowId}`) return } diff --git a/apps/sim/hooks/use-collaborative-workflow.ts b/apps/sim/hooks/use-collaborative-workflow.ts index 1c888a84209..b90a39ba618 100644 --- a/apps/sim/hooks/use-collaborative-workflow.ts +++ b/apps/sim/hooks/use-collaborative-workflow.ts @@ -537,82 +537,81 @@ export function useCollaborativeWorkflow() { } } - const handleWorkflowReverted = async (data: any) => { - const { workflowId } = data - logger.info(`Workflow ${workflowId} has been reverted to deployed state`) + const reloadWorkflowFromApi = async (workflowId: string, reason: string): Promise => { + const response = await fetch(`/api/workflows/${workflowId}`) + if (!response.ok) { + logger.error(`Failed to fetch workflow data after ${reason}: ${response.statusText}`) + return false + } - // If the reverted workflow is the currently active one, reload the workflow state - if (activeWorkflowId === workflowId) { - logger.info(`Currently active workflow ${workflowId} was reverted, reloading state`) - - try { - // Fetch the updated workflow state from the server (which loads from normalized tables) - const response = await fetch(`/api/workflows/${workflowId}`) - if (response.ok) { - const responseData = await response.json() - const workflowData = responseData.data - - if (workflowData?.state) { - // Update the workflow store with the reverted state - isApplyingRemoteChange.current = true - try { - // Update the main workflow state using the API response - useWorkflowStore.getState().replaceWorkflowState({ - blocks: workflowData.state.blocks || {}, - edges: workflowData.state.edges || [], - loops: workflowData.state.loops || {}, - parallels: workflowData.state.parallels || {}, - lastSaved: workflowData.state.lastSaved || Date.now(), - deploymentStatuses: workflowData.state.deploymentStatuses || {}, - }) + const responseData = await response.json() + const workflowData = responseData.data - // Update subblock store with reverted values - const subblockValues: Record> = {} - Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => { - const blockState = block as any - subblockValues[blockId] = {} - Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => { - subblockValues[blockId][subblockId] = (subblock as any).value - }) - }) + if (!workflowData?.state) { + logger.error(`No state found in workflow data after ${reason}`, { workflowData }) + return false + } - // Update subblock store for this workflow - useSubBlockStore.setState((state: any) => ({ - workflowValues: { - ...state.workflowValues, - [workflowId]: subblockValues, - }, - })) + isApplyingRemoteChange.current = true + try { + useWorkflowStore.getState().replaceWorkflowState({ + blocks: workflowData.state.blocks || {}, + edges: workflowData.state.edges || [], + loops: workflowData.state.loops || {}, + parallels: workflowData.state.parallels || {}, + lastSaved: workflowData.state.lastSaved || Date.now(), + deploymentStatuses: workflowData.state.deploymentStatuses || {}, + }) - logger.info(`Successfully loaded reverted workflow state for ${workflowId}`) + const subblockValues: Record> = {} + Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => { + const blockState = block as any + subblockValues[blockId] = {} + Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => { + subblockValues[blockId][subblockId] = (subblock as any).value + }) + }) - const graph = { - blocksById: workflowData.state.blocks || {}, - edgesById: Object.fromEntries( - (workflowData.state.edges || []).map((e: any) => [e.id, e]) - ), - } + useSubBlockStore.setState((state: any) => ({ + workflowValues: { + ...state.workflowValues, + [workflowId]: subblockValues, + }, + })) - const undoRedoStore = useUndoRedoStore.getState() - const stackKeys = Object.keys(undoRedoStore.stacks) - stackKeys.forEach((key) => { - const [wfId, userId] = key.split(':') - if (wfId === workflowId) { - undoRedoStore.pruneInvalidEntries(wfId, userId, graph) - } - }) - } finally { - isApplyingRemoteChange.current = false - } - } else { - logger.error('No state found in workflow data after revert', { workflowData }) - } - } else { - logger.error(`Failed to fetch workflow data after revert: ${response.statusText}`) - } - } catch (error) { - logger.error('Error reloading workflow state after revert:', error) + const graph = { + blocksById: workflowData.state.blocks || {}, + edgesById: Object.fromEntries( + (workflowData.state.edges || []).map((e: any) => [e.id, e]) + ), } + + const undoRedoStore = useUndoRedoStore.getState() + const stackKeys = Object.keys(undoRedoStore.stacks) + stackKeys.forEach((key) => { + const [wfId, userId] = key.split(':') + if (wfId === workflowId) { + undoRedoStore.pruneInvalidEntries(wfId, userId, graph) + } + }) + + logger.info(`Successfully reloaded workflow state after ${reason}`, { workflowId }) + return true + } finally { + isApplyingRemoteChange.current = false + } + } + + const handleWorkflowReverted = async (data: any) => { + const { workflowId } = data + logger.info(`Workflow ${workflowId} has been reverted to deployed state`) + + if (activeWorkflowId !== workflowId) return + + try { + await reloadWorkflowFromApi(workflowId, 'revert') + } catch (error) { + logger.error('Error reloading workflow state after revert:', error) } } @@ -629,49 +628,7 @@ export function useCollaborativeWorkflow() { } try { - const response = await fetch(`/api/workflows/${workflowId}`) - if (response.ok) { - const responseData = await response.json() - const workflowData = responseData.data - - if (workflowData?.state) { - isApplyingRemoteChange.current = true - try { - useWorkflowStore.getState().replaceWorkflowState({ - blocks: workflowData.state.blocks || {}, - edges: workflowData.state.edges || [], - loops: workflowData.state.loops || {}, - parallels: workflowData.state.parallels || {}, - lastSaved: workflowData.state.lastSaved || Date.now(), - deploymentStatuses: workflowData.state.deploymentStatuses || {}, - }) - - const subblockValues: Record> = {} - Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => { - const blockState = block as any - subblockValues[blockId] = {} - Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => { - subblockValues[blockId][subblockId] = (subblock as any).value - }) - }) - - useSubBlockStore.setState((state: any) => ({ - workflowValues: { - ...state.workflowValues, - [workflowId]: subblockValues, - }, - })) - - logger.info(`Successfully applied externally updated workflow state`, { workflowId }) - } finally { - isApplyingRemoteChange.current = false - } - } - } else { - logger.error( - `Failed to fetch workflow data after external update: ${response.statusText}` - ) - } + await reloadWorkflowFromApi(workflowId, 'external update') } catch (error) { logger.error('Error reloading workflow state after external update:', error) } diff --git a/apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts b/apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts index e1824f07a28..7be066d2989 100644 --- a/apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts +++ b/apps/sim/lib/copilot/tools/server/workflow/edit-workflow/index.ts @@ -288,19 +288,17 @@ export const editWorkflowServerTool: BaseServerTool logger.info('Workflow state persisted to database', { workflowId }) - try { - const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002' - await fetch(`${socketUrl}/api/workflow-updated`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'x-api-key': env.INTERNAL_API_SECRET, - }, - body: JSON.stringify({ workflowId }), - }) - } catch (error) { + const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002' + fetch(`${socketUrl}/api/workflow-updated`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-api-key': env.INTERNAL_API_SECRET, + }, + body: JSON.stringify({ workflowId }), + }).catch((error) => { logger.warn('Failed to notify socket server of workflow update', { workflowId, error }) - } + }) const sanitizationWarnings = validation.warnings.length > 0 ? validation.warnings : undefined From 93197f1715306347bec550635c8625c674e5b7ca Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Fri, 3 Apr 2026 18:35:45 -0700 Subject: [PATCH 3/3] fix cursor positioning bug --- .../workspace/[workspaceId]/w/[workflowId]/workflow.tsx | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx index 5e69dfc53c2..6d513074196 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx @@ -2152,12 +2152,9 @@ const WorkflowContent = React.memo( const handleCanvasPointerMove = useCallback( (event: React.PointerEvent) => { - const target = event.currentTarget as HTMLElement - const bounds = target.getBoundingClientRect() - const position = screenToFlowPosition({ - x: event.clientX - bounds.left, - y: event.clientY - bounds.top, + x: event.clientX, + y: event.clientY, }) emitCursorUpdate(position)