11import { db } from '@sim/db'
2- import { jobExecutionLogs , workflowExecutionLogs } from '@sim/db/schema'
2+ import { jobExecutionLogs , pausedExecutions , workflowExecutionLogs } from '@sim/db/schema'
33import { createLogger } from '@sim/logger'
44import { task } from '@trigger.dev/sdk'
5- import { and , inArray , lt } from 'drizzle-orm'
5+ import { and , eq , inArray , isNull , lt , notInArray , or , sql } from 'drizzle-orm'
66import { type CleanupJobPayload , resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
77import {
88 batchDeleteByWorkspaceAndTimestamp ,
99 chunkedBatchDelete ,
1010 type TableCleanupResult ,
1111} from '@/lib/cleanup/batch-delete'
12+ import { collectLargeValueKeys } from '@/lib/execution/payloads/large-execution-value'
1213import { snapshotService } from '@/lib/logs/execution/snapshot/service'
1314import { isUsingCloudStorage , StorageService } from '@/lib/uploads'
1415import { deleteFileMetadata } from '@/lib/uploads/server/metadata'
@@ -19,6 +20,38 @@ interface FileDeleteStats {
1920 filesTotal : number
2021 filesDeleted : number
2122 filesDeleteFailed : number
23+ largeValuesTotal : number
24+ largeValuesDeleted : number
25+ largeValuesDeleteFailed : number
26+ }
27+
28+ const RESUMABLE_PAUSED_STATUSES = [ 'paused' , 'partially_resumed' , 'cancelling' ]
29+
30+ async function filterLargeValueKeysWithoutRetainedReferences (
31+ keys : string [ ] ,
32+ deletedLogIds : string [ ]
33+ ) : Promise < string [ ] > {
34+ if ( keys . length === 0 || deletedLogIds . length === 0 ) return [ ]
35+
36+ const unreferencedKeys : string [ ] = [ ]
37+ for ( const key of Array . from ( new Set ( keys ) ) ) {
38+ const [ referencingLog ] = await db
39+ . select ( { id : workflowExecutionLogs . id } )
40+ . from ( workflowExecutionLogs )
41+ . where (
42+ and (
43+ notInArray ( workflowExecutionLogs . id , deletedLogIds ) ,
44+ sql `position(${ key } in ${ workflowExecutionLogs . executionData } ::text) > 0`
45+ )
46+ )
47+ . limit ( 1 )
48+
49+ if ( ! referencingLog ) {
50+ unreferencedKeys . push ( key )
51+ }
52+ }
53+
54+ return unreferencedKeys
2255}
2356
2457async function deleteExecutionFiles ( files : unknown , stats : FileDeleteStats ) : Promise < void > {
@@ -41,36 +74,103 @@ async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Pro
4174 )
4275}
4376
77+ async function deleteLargeValueStorageKeys ( keys : string [ ] , stats : FileDeleteStats ) : Promise < void > {
78+ if ( ! isUsingCloudStorage ( ) || keys . length === 0 ) return
79+
80+ const uniqueKeys = Array . from ( new Set ( keys ) )
81+ stats . largeValuesTotal += uniqueKeys . length
82+
83+ await Promise . all (
84+ uniqueKeys . map ( async ( key ) => {
85+ try {
86+ await StorageService . deleteFile ( { key, context : 'execution' } )
87+ await deleteFileMetadata ( key )
88+ stats . largeValuesDeleted ++
89+ } catch ( error ) {
90+ stats . largeValuesDeleteFailed ++
91+ logger . error ( `Failed to delete large execution value ${ key } :` , { error } )
92+ }
93+ } )
94+ )
95+ }
96+
4497async function cleanupWorkflowExecutionLogs (
4598 workspaceIds : string [ ] ,
4699 retentionDate : Date ,
47100 label : string
48101) : Promise < TableCleanupResult & FileDeleteStats > {
49- const fileStats : FileDeleteStats = { filesTotal : 0 , filesDeleted : 0 , filesDeleteFailed : 0 }
102+ const fileStats : FileDeleteStats = {
103+ filesTotal : 0 ,
104+ filesDeleted : 0 ,
105+ filesDeleteFailed : 0 ,
106+ largeValuesTotal : 0 ,
107+ largeValuesDeleted : 0 ,
108+ largeValuesDeleteFailed : 0 ,
109+ }
50110
51111 const dbStats = await chunkedBatchDelete ( {
52112 tableDef : workflowExecutionLogs ,
53113 workspaceIds,
54114 tableName : `${ label } /workflow_execution_logs` ,
55115 selectChunk : ( chunkIds , limit ) =>
56116 db
57- . select ( { id : workflowExecutionLogs . id , files : workflowExecutionLogs . files } )
117+ . select ( {
118+ id : workflowExecutionLogs . id ,
119+ executionId : workflowExecutionLogs . executionId ,
120+ executionData : workflowExecutionLogs . executionData ,
121+ files : workflowExecutionLogs . files ,
122+ } )
58123 . from ( workflowExecutionLogs )
124+ . leftJoin (
125+ pausedExecutions ,
126+ eq ( pausedExecutions . executionId , workflowExecutionLogs . executionId )
127+ )
59128 . where (
60129 and (
61130 inArray ( workflowExecutionLogs . workspaceId , chunkIds ) ,
62- lt ( workflowExecutionLogs . startedAt , retentionDate )
131+ lt ( workflowExecutionLogs . startedAt , retentionDate ) ,
132+ or (
133+ isNull ( pausedExecutions . status ) ,
134+ notInArray ( pausedExecutions . status , RESUMABLE_PAUSED_STATUSES )
135+ )
63136 )
64137 )
65138 . limit ( limit ) ,
66139 onBatch : async ( rows ) => {
67- for ( const row of rows ) await deleteExecutionFiles ( row . files , fileStats )
140+ const deletedLogIds = rows . map ( ( row ) => row . id )
141+ const largeValueKeys = rows . flatMap ( ( row ) => collectLargeValueKeys ( row . executionData ) )
142+ const unreferencedLargeValueKeys = await filterLargeValueKeysWithoutRetainedReferences (
143+ largeValueKeys ,
144+ deletedLogIds
145+ )
146+
147+ for ( const row of rows ) {
148+ await deleteExecutionFiles ( row . files , fileStats )
149+ }
150+ await deleteLargeValueStorageKeys ( unreferencedLargeValueKeys , fileStats )
68151 } ,
69152 } )
70153
71154 return { ...dbStats , ...fileStats }
72155}
73156
157+ async function cleanupFreePlanOrphanedSnapshots (
158+ payload : CleanupJobPayload ,
159+ retentionHours : number
160+ ) : Promise < void > {
161+ if ( payload . plan !== 'free' ) {
162+ return
163+ }
164+
165+ try {
166+ const retentionDays = Math . floor ( retentionHours / 24 )
167+ const snapshotsCleaned = await snapshotService . cleanupOrphanedSnapshots ( retentionDays + 1 )
168+ logger . info ( `Cleaned up ${ snapshotsCleaned } orphaned snapshots` )
169+ } catch ( snapshotError ) {
170+ logger . error ( 'Error cleaning up orphaned snapshots:' , { snapshotError } )
171+ }
172+ }
173+
74174export async function runCleanupLogs ( payload : CleanupJobPayload ) : Promise < void > {
75175 const startTime = Date . now ( )
76176
@@ -82,12 +182,14 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
82182
83183 const { workspaceIds, retentionHours, label } = scope
84184
185+ const retentionDate = new Date ( Date . now ( ) - retentionHours * 60 * 60 * 1000 )
186+
85187 if ( workspaceIds . length === 0 ) {
86188 logger . info ( `[${ label } ] No workspaces to process` )
189+ await cleanupFreePlanOrphanedSnapshots ( payload , retentionHours )
87190 return
88191 }
89192
90- const retentionDate = new Date ( Date . now ( ) - retentionHours * 60 * 60 * 1000 )
91193 logger . info (
92194 `[${ label } ] Cleaning ${ workspaceIds . length } workspaces, cutoff: ${ retentionDate . toISOString ( ) } `
93195 )
@@ -96,6 +198,9 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
96198 logger . info (
97199 `[${ label } ] workflow_execution_logs files: ${ workflowResults . filesDeleted } /${ workflowResults . filesTotal } deleted, ${ workflowResults . filesDeleteFailed } failed`
98200 )
201+ logger . info (
202+ `[${ label } ] workflow_execution_logs large values: ${ workflowResults . largeValuesDeleted } /${ workflowResults . largeValuesTotal } deleted, ${ workflowResults . largeValuesDeleteFailed } failed`
203+ )
99204
100205 await batchDeleteByWorkspaceAndTimestamp ( {
101206 tableDef : jobExecutionLogs ,
@@ -106,16 +211,7 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
106211 tableName : `${ label } /job_execution_logs` ,
107212 } )
108213
109- // Snapshot cleanup runs only on the free job to avoid running it N times for N enterprise workspaces.
110- if ( payload . plan === 'free' ) {
111- try {
112- const retentionDays = Math . floor ( retentionHours / 24 )
113- const snapshotsCleaned = await snapshotService . cleanupOrphanedSnapshots ( retentionDays + 1 )
114- logger . info ( `Cleaned up ${ snapshotsCleaned } orphaned snapshots` )
115- } catch ( snapshotError ) {
116- logger . error ( 'Error cleaning up orphaned snapshots:' , { snapshotError } )
117- }
118- }
214+ await cleanupFreePlanOrphanedSnapshots ( payload , retentionHours )
119215
120216 const timeElapsed = ( Date . now ( ) - startTime ) / 1000
121217 logger . info ( `[${ label } ] Job completed in ${ timeElapsed . toFixed ( 2 ) } s` )
0 commit comments