Skip to content

Commit 93f7be4

Browse files
waleedlatif1claude
andauthored
improvement(redis): strip idempotency body and cap mothership stream zsets (#4625)
* improvement(redis): strip idempotency body and cap mothership stream zsets * chore(redis): trim verbose comments on idempotency body-strip * test(buffer): pin exact ZREMRANGEBYRANK stop arg Pinning -5_001 (= -(DEFAULT_EVENT_LIMIT) - 1) so the off-by-one boundary is directly validated; expect.any(Number) would have passed a wrong formula like -eventLimit. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 8d7bbbc commit 93f7be4

3 files changed

Lines changed: 26 additions & 3 deletions

File tree

apps/sim/lib/copilot/request/session/buffer.test.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ describe('mothership-stream-outbox', () => {
149149
expect(replayed.map((entry) => entry.payload.text)).toEqual(['world'])
150150
})
151151

152-
it('does not trim active stream history while appending events', async () => {
152+
it('trims active stream history to eventLimit on every append', async () => {
153153
const cursor = await allocateCursor('stream-1')
154154

155155
await appendEvent(
@@ -163,7 +163,11 @@ describe('mothership-stream-outbox', () => {
163163
})
164164
)
165165

166-
expect(mockRedis.zremrangebyrank).not.toHaveBeenCalled()
166+
expect(mockRedis.zremrangebyrank).toHaveBeenCalledWith(
167+
'mothership_stream:stream-1:events',
168+
0,
169+
-5_001
170+
)
167171
})
168172

169173
it('clears persisted stream state during teardown cleanup', async () => {

apps/sim/lib/copilot/request/session/buffer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ export async function appendEvents(
144144
zaddArgs.push(envelope.seq, JSON.stringify(envelope))
145145
}
146146
pipeline.zadd(key, ...(zaddArgs as [number, string, ...Array<number | string>]))
147+
pipeline.zremrangebyrank(key, 0, -config.eventLimit - 1)
147148
pipeline.expire(key, config.ttlSeconds)
148149
pipeline.set(seqKey, String(envelopes[envelopes.length - 1].seq), 'EX', config.ttlSeconds)
149150
await pipeline.exec()

apps/sim/lib/core/idempotency/service.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ export interface IdempotencyConfig {
1717
namespace?: string
1818
/** When true, failed keys are deleted rather than stored so the operation is retried on the next attempt. */
1919
retryFailures?: boolean
20+
/**
21+
* When false, only `{ success, status, error? }` is persisted — not the
22+
* operation's return value. Duplicate calls still short-circuit but
23+
* resolve to `undefined`. Use when callers don't consume the cached
24+
* body (e.g. webhook receivers, where the provider just wants a 2xx).
25+
* Defaults to true.
26+
*/
27+
storeResultBody?: boolean
2028
/**
2129
* Force a specific storage backend regardless of the environment's
2230
* auto-detection. Use `'database'` for correctness-critical flows
@@ -78,6 +86,7 @@ export class IdempotencyService {
7886
ttlSeconds: config.ttlSeconds ?? DEFAULT_TTL,
7987
namespace: config.namespace ?? 'default',
8088
retryFailures: config.retryFailures ?? false,
89+
storeResultBody: config.storeResultBody ?? true,
8190
}
8291
this.storageMethod = config.forceStorage ?? getStorageMethod()
8392
logger.info(`IdempotencyService using ${this.storageMethod} storage`, {
@@ -442,7 +451,9 @@ export class IdempotencyService {
442451

443452
await this.storeResult(
444453
claimResult.normalizedKey,
445-
{ success: true, result, status: 'completed' },
454+
this.config.storeResultBody
455+
? { success: true, result, status: 'completed' }
456+
: { success: true, status: 'completed' },
446457
claimResult.storageMethod
447458
)
448459

@@ -511,15 +522,22 @@ export class IdempotencyService {
511522
}
512523
}
513524

525+
/**
526+
* As a webhook receiver we only need a "we saw this delivery" marker —
527+
* the provider's retry just needs a 2xx, not our cached response body.
528+
* TTL must exceed the longest provider retry window (Gmail / Pub-Sub: 7d).
529+
*/
514530
export const webhookIdempotency = new IdempotencyService({
515531
namespace: 'webhook',
516532
ttlSeconds: 60 * 60 * 24 * 7, // 7 days
533+
storeResultBody: false,
517534
})
518535

519536
export const pollingIdempotency = new IdempotencyService({
520537
namespace: 'polling',
521538
ttlSeconds: 60 * 60 * 24 * 3, // 3 days
522539
retryFailures: true,
540+
storeResultBody: false,
523541
})
524542

525543
/**

0 commit comments

Comments
 (0)