Skip to content

Commit 98c1520

Browse files
d-csclaude
andcommitted
chore(mollifier): address CodeRabbit review for phase-1 PR
- changeset: drop "deferred" wording — phase-1 actively dual-writes + runs the drainer ack loop. - worker.server.ts: wrap mollifier drainer init in try/catch + register SIGTERM/SIGINT handlers so the polling loop stops cleanly on shutdown. - bufferedTriggerPayload: only serialise idempotencyKeyExpiresAt when an idempotencyKey is present (avoid impossible orphan-expiry payloads). - mollifierTelemetry: narrow recordDecision reason to DecisionReason union to keep OTEL attribute cardinality bounded. - mollifierGate: rename resolveOrgFlag → resolveFlag. The underlying FeatureFlag table is global by key, so the "org" prefix was misleading; per-org gating is out of scope for phase-1. - tests: drop vi.fn mocks. mollifierGate now uses plain closure spies; mollifierTripEvaluator runs against a real MollifierBuffer backed by a redisTest container (closed client exercises the fail-open path). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 452ebda commit 98c1520

8 files changed

Lines changed: 179 additions & 107 deletions

.changeset/mollifier-redis-worker-primitives.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
"@trigger.dev/redis-worker": patch
33
---
44

5-
Add MollifierBuffer (with `accept`, `pop`, `ack`, `requeue`, `fail`, and `evaluateTrip`) and MollifierDrainer primitives for trigger burst smoothing. `evaluateTrip` is an atomic Lua sliding-window trip evaluator used by the webapp gate to detect per-env trigger bursts. Webapp shadow-mode logging is wired; buffer writes and drainer activation are deferred to a follow-up.
5+
Add MollifierBuffer (with `accept`, `pop`, `ack`, `requeue`, `fail`, and `evaluateTrip`) and MollifierDrainer primitives for trigger burst smoothing. `evaluateTrip` is an atomic Lua sliding-window trip evaluator used by the webapp gate to detect per-env trigger bursts. Phase 1 wires MollifierBuffer dual-write monitoring alongside the real trigger path and runs MollifierDrainer's pop/ack loop end-to-end with a no-op handler; full buffering and replayed drainer-side triggers land in later phases.

apps/webapp/app/services/worker.server.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,24 @@ export async function init() {
130130
await workerQueue.initialize();
131131
}
132132

133-
getMollifierDrainer();
133+
try {
134+
const drainer = getMollifierDrainer();
135+
if (drainer) {
136+
// The drainer owns a polling loop and a Redis client; let it drain
137+
// in-flight pops on shutdown rather than tearing the process down
138+
// mid-handler. Idempotent — `drainer.stop()` short-circuits if already
139+
// stopped, so registering on both signals is safe.
140+
const stopDrainer = () => {
141+
drainer.stop().catch((error) => {
142+
logger.error("Failed to stop mollifier drainer", { error });
143+
});
144+
};
145+
process.once("SIGTERM", stopDrainer);
146+
process.once("SIGINT", stopDrainer);
147+
}
148+
} catch (error) {
149+
logger.error("Failed to initialise mollifier drainer", { error });
150+
}
134151
}
135152

136153
function getWorkerQueue() {

apps/webapp/app/v3/mollifier/bufferedTriggerPayload.server.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,10 @@ export function buildBufferedTriggerPayload(input: {
9292
taskId: input.taskId,
9393
body: input.body,
9494
idempotencyKey: input.idempotencyKey,
95-
idempotencyKeyExpiresAt: input.idempotencyKeyExpiresAt
96-
? input.idempotencyKeyExpiresAt.toISOString()
97-
: null,
95+
idempotencyKeyExpiresAt:
96+
input.idempotencyKey && input.idempotencyKeyExpiresAt
97+
? input.idempotencyKeyExpiresAt.toISOString()
98+
: null,
9899
tags: input.tags,
99100
parentRunFriendlyId: input.parentRunFriendlyId,
100101
traceContext: input.traceContext,

apps/webapp/app/v3/mollifier/mollifierGate.server.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ import { flag } from "~/v3/featureFlags.server";
44
import { FEATURE_FLAG } from "~/v3/featureFlags";
55
import { getMollifierBuffer } from "./mollifierBuffer.server";
66
import { createRealTripEvaluator } from "./mollifierTripEvaluator.server";
7-
import { recordDecision, type DecisionOutcome } from "./mollifierTelemetry.server";
7+
import {
8+
recordDecision,
9+
type DecisionOutcome,
10+
type DecisionReason,
11+
} from "./mollifierTelemetry.server";
812

913
// `count` is the *single-instance* sliding-window counter, not a fleet-wide
1014
// aggregate. Each webapp instance maintains its own Redis key, so the fleet
@@ -37,7 +41,7 @@ export type TripEvaluator = (inputs: GateInputs) => Promise<TripDecision>;
3741
export type GateDependencies = {
3842
isMollifierEnabled: () => boolean;
3943
isShadowModeOn: () => boolean;
40-
resolveOrgFlag: () => Promise<boolean>;
44+
resolveFlag: () => Promise<boolean>;
4145
evaluator: TripEvaluator;
4246
logShadow: (
4347
inputs: GateInputs,
@@ -47,7 +51,7 @@ export type GateDependencies = {
4751
inputs: GateInputs,
4852
decision: Extract<TripDecision, { divert: true }>,
4953
) => void;
50-
recordDecision: (outcome: DecisionOutcome, reason?: string) => void;
54+
recordDecision: (outcome: DecisionOutcome, reason?: DecisionReason) => void;
5155
};
5256

5357
// `options` is a thunk so env reads happen per-evaluation, not at module load.
@@ -82,7 +86,7 @@ function logDivertDecision(
8286
export const defaultGateDependencies: GateDependencies = {
8387
isMollifierEnabled: () => env.MOLLIFIER_ENABLED === "1",
8488
isShadowModeOn: () => env.MOLLIFIER_SHADOW_MODE === "1",
85-
resolveOrgFlag: () =>
89+
resolveFlag: () =>
8690
flag({ key: FEATURE_FLAG.mollifierEnabled, defaultValue: false }),
8791
evaluator: defaultEvaluator,
8892
logShadow: (inputs, decision) =>
@@ -103,10 +107,10 @@ export async function evaluateGate(
103107
return { action: "pass_through" };
104108
}
105109

106-
const orgFlagEnabled = await d.resolveOrgFlag();
110+
const flagEnabled = await d.resolveFlag();
107111
const shadowOn = d.isShadowModeOn();
108112

109-
if (!orgFlagEnabled && !shadowOn) {
113+
if (!flagEnabled && !shadowOn) {
110114
d.recordDecision("pass_through");
111115
return { action: "pass_through" };
112116
}
@@ -117,7 +121,7 @@ export async function evaluateGate(
117121
return { action: "pass_through" };
118122
}
119123

120-
if (orgFlagEnabled) {
124+
if (flagEnabled) {
121125
d.logMollified(inputs, decision);
122126
d.recordDecision("mollify", decision.reason);
123127
return { action: "mollify", decision };

apps/webapp/app/v3/mollifier/mollifierTelemetry.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ export const mollifierDecisionsCounter = meter.createCounter("mollifier.decision
77
});
88

99
export type DecisionOutcome = "pass_through" | "shadow_log" | "mollify";
10+
export type DecisionReason = "per_env_rate";
1011

11-
export function recordDecision(outcome: DecisionOutcome, reason?: string): void {
12+
export function recordDecision(outcome: DecisionOutcome, reason?: DecisionReason): void {
1213
mollifierDecisionsCounter.add(1, {
1314
outcome,
1415
...(reason ? { reason } : {}),

apps/webapp/test/bufferedTriggerPayload.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@ describe("buildBufferedTriggerPayload", () => {
5050
const noKey = buildBufferedTriggerPayload(baseInput);
5151
expect(noKey.idempotencyKey).toBeNull();
5252
expect(noKey.idempotencyKeyExpiresAt).toBeNull();
53+
54+
// Defensive: an expiresAt without an accompanying key is an impossible
55+
// idempotency state — drop the expiresAt rather than serialise it.
56+
const orphanExpiry = buildBufferedTriggerPayload({
57+
...baseInput,
58+
idempotencyKey: null,
59+
idempotencyKeyExpiresAt: new Date("2026-05-13T10:00:00.000Z"),
60+
});
61+
expect(orphanExpiry.idempotencyKey).toBeNull();
62+
expect(orphanExpiry.idempotencyKeyExpiresAt).toBeNull();
5363
});
5464

5565
it("preserves customer body byte-equivalent (drainer replay must match Postgres)", () => {

apps/webapp/test/mollifierGate.test.ts

Lines changed: 62 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,56 @@
1-
import { describe, expect, it, vi } from "vitest";
1+
import { describe, expect, it } from "vitest";
22
import {
33
evaluateGate,
44
type GateDependencies,
55
type GateInputs,
66
type TripDecision,
77
} from "~/v3/mollifier/mollifierGate.server";
8+
import type { DecisionOutcome, DecisionReason } from "~/v3/mollifier/mollifierTelemetry.server";
89

10+
// We deliberately don't use vi.fn here. Per repo policy tests shouldn't lean on
11+
// mock frameworks for behaviours that are pure functions of the inputs — the
12+
// gate is pure decision logic, so a hand-rolled "deps + spy log" wired with
13+
// plain closures gives exactly the assertions we need without the indirection.
914
type Spies = {
10-
[K in keyof GateDependencies]: ReturnType<typeof vi.fn>;
15+
evaluatorCalls: number;
16+
logShadowCalls: Array<{ inputs: GateInputs; decision: Extract<TripDecision, { divert: true }> }>;
17+
logMollifiedCalls: Array<{ inputs: GateInputs; decision: Extract<TripDecision, { divert: true }> }>;
18+
recordDecisionCalls: Array<{ outcome: DecisionOutcome; reason?: DecisionReason }>;
1119
};
1220

13-
function makeDeps(overrides: Partial<GateDependencies> = {}): {
14-
deps: GateDependencies;
15-
spies: Spies;
16-
} {
17-
const defaults: GateDependencies = {
18-
isMollifierEnabled: () => false,
19-
isShadowModeOn: () => false,
20-
resolveOrgFlag: async () => false,
21-
evaluator: async () => ({ divert: false }) as TripDecision,
22-
logShadow: () => {},
23-
logMollified: () => {},
24-
recordDecision: () => {},
21+
type Toggles = {
22+
enabled: boolean;
23+
shadow: boolean;
24+
flag: boolean;
25+
decision: TripDecision;
26+
};
27+
28+
function makeDeps(toggles: Toggles): { deps: GateDependencies; spies: Spies } {
29+
const spies: Spies = {
30+
evaluatorCalls: 0,
31+
logShadowCalls: [],
32+
logMollifiedCalls: [],
33+
recordDecisionCalls: [],
34+
};
35+
const deps: GateDependencies = {
36+
isMollifierEnabled: () => toggles.enabled,
37+
isShadowModeOn: () => toggles.shadow,
38+
resolveFlag: async () => toggles.flag,
39+
evaluator: async () => {
40+
spies.evaluatorCalls += 1;
41+
return toggles.decision;
42+
},
43+
logShadow: (inputs, decision) => {
44+
spies.logShadowCalls.push({ inputs, decision });
45+
},
46+
logMollified: (inputs, decision) => {
47+
spies.logMollifiedCalls.push({ inputs, decision });
48+
},
49+
recordDecision: (outcome, reason) => {
50+
spies.recordDecisionCalls.push({ outcome, reason });
51+
},
2552
};
26-
const merged = { ...defaults, ...overrides };
27-
const spies = {
28-
isMollifierEnabled: vi.fn(merged.isMollifierEnabled),
29-
isShadowModeOn: vi.fn(merged.isShadowModeOn),
30-
resolveOrgFlag: vi.fn(merged.resolveOrgFlag),
31-
evaluator: vi.fn(merged.evaluator),
32-
logShadow: vi.fn(merged.logShadow),
33-
logMollified: vi.fn(merged.logMollified),
34-
recordDecision: vi.fn(merged.recordDecision),
35-
} satisfies Spies;
36-
return { deps: spies, spies };
53+
return { deps, spies };
3754
}
3855

3956
const trippedDecision = {
@@ -101,53 +118,49 @@ describe("evaluateGate cascade — exhaustive truth table", () => {
101118
"row $id: enabled=$enabled shadow=$shadow flag=$flag divert=$divert → action=$expected.action",
102119
async (row) => {
103120
const { deps, spies } = makeDeps({
104-
isMollifierEnabled: () => row.enabled,
105-
isShadowModeOn: () => row.shadow,
106-
resolveOrgFlag: async () => row.flag,
107-
evaluator: async () => (row.divert ? trippedDecision : passDecision),
121+
enabled: row.enabled,
122+
shadow: row.shadow,
123+
flag: row.flag,
124+
decision: row.divert ? trippedDecision : passDecision,
108125
});
109126

110127
const outcome = await evaluateGate(inputs, deps);
111128

112129
expect(outcome.action).toBe(row.expected.action);
113-
expect(spies.evaluator).toHaveBeenCalledTimes(row.expected.evaluatorCalls);
114-
expect(spies.logShadow).toHaveBeenCalledTimes(row.expected.logShadowCalls);
115-
expect(spies.logMollified).toHaveBeenCalledTimes(row.expected.logMollifiedCalls);
130+
expect(spies.evaluatorCalls).toBe(row.expected.evaluatorCalls);
131+
expect(spies.logShadowCalls).toHaveLength(row.expected.logShadowCalls);
132+
expect(spies.logMollifiedCalls).toHaveLength(row.expected.logMollifiedCalls);
116133

117134
// Every evaluation records exactly one decision.
118-
expect(spies.recordDecision).toHaveBeenCalledTimes(1);
119-
if (row.expected.expectedReason === undefined) {
120-
expect(spies.recordDecision).toHaveBeenCalledWith(row.expected.recordedOutcome);
121-
} else {
122-
expect(spies.recordDecision).toHaveBeenCalledWith(
123-
row.expected.recordedOutcome,
124-
row.expected.expectedReason,
125-
);
126-
}
135+
expect(spies.recordDecisionCalls).toHaveLength(1);
136+
expect(spies.recordDecisionCalls[0].outcome).toBe(row.expected.recordedOutcome);
137+
expect(spies.recordDecisionCalls[0].reason).toBe(row.expected.expectedReason);
127138
},
128139
);
129140

130141
it("divert log carries the full decision (envId, orgId, taskId, reason, count, threshold, windowMs, holdMs)", async () => {
131142
const { deps, spies } = makeDeps({
132-
isMollifierEnabled: () => true,
133-
isShadowModeOn: () => true,
134-
evaluator: async () => trippedDecision,
143+
enabled: true,
144+
shadow: true,
145+
flag: false,
146+
decision: trippedDecision,
135147
});
136148

137149
await evaluateGate(inputs, deps);
138150

139-
expect(spies.logShadow).toHaveBeenCalledWith(inputs, trippedDecision);
151+
expect(spies.logShadowCalls).toEqual([{ inputs, decision: trippedDecision }]);
140152
});
141153

142154
it("mollify log carries the full decision (mirrors shadow log)", async () => {
143155
const { deps, spies } = makeDeps({
144-
isMollifierEnabled: () => true,
145-
resolveOrgFlag: async () => true,
146-
evaluator: async () => trippedDecision,
156+
enabled: true,
157+
shadow: false,
158+
flag: true,
159+
decision: trippedDecision,
147160
});
148161

149162
await evaluateGate(inputs, deps);
150163

151-
expect(spies.logMollified).toHaveBeenCalledWith(inputs, trippedDecision);
164+
expect(spies.logMollifiedCalls).toEqual([{ inputs, decision: trippedDecision }]);
152165
});
153166
});

0 commit comments

Comments
 (0)