Skip to content

Commit ee03533

Browse files
committed
chore(mollifier): address review follow-ups for phase-1 PR
- Gate drainer init on WORKER_ENABLED so only worker replicas run the polling loop. - Update the enqueueSystem TTL comment now that delayed/pending-version are first enqueues. - Correct the mollifier gate docstring to describe the fixed-window counter and tripped-key rearm. - Swap findUnique for findFirst in the trigger task test to match the webapp Prisma rule.
1 parent 684e089 commit ee03533

4 files changed

Lines changed: 23 additions & 7 deletions

File tree

apps/webapp/app/services/worker.server.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,14 @@ export async function init() {
131131
await workerQueue.initialize();
132132
}
133133

134+
// Only the worker role drains the mollifier buffer. API-only replicas
135+
// still produce into the buffer via the trigger hot path, but the
136+
// polling loop + Redis consumer connection only belongs on workers —
137+
// otherwise every webapp replica races for the same entries.
138+
if (env.WORKER_ENABLED !== "true") {
139+
return;
140+
}
141+
134142
try {
135143
const drainer = getMollifierDrainer();
136144
if (drainer && !global.__mollifierShutdownRegistered__) {

apps/webapp/app/v3/mollifier/mollifierGate.server.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,15 @@ import {
1010
type DecisionReason,
1111
} from "./mollifierTelemetry.server";
1212

13-
// `count` is the *single-instance* sliding-window counter, not a fleet-wide
14-
// aggregate. Each webapp instance maintains its own Redis key, so the fleet
15-
// effective ceiling is `instance_count * threshold`. Phase 2 consumers must
16-
// not treat `count` as a global rate.
13+
// `count` is the *single-instance* fixed-window counter (INCR with a PEXPIRE
14+
// armed on the first tick of each window — see `mollifierEvaluateTrip` in
15+
// `packages/redis-worker/src/mollifier/buffer.ts`). It is not a fleet-wide
16+
// aggregate: each webapp instance maintains its own Redis key, so the fleet
17+
// effective ceiling is `instance_count * threshold`, and at a window boundary
18+
// the instance can briefly admit up to ~2x threshold before tripping. The
19+
// tripped marker is refreshed on every overage call, so a sustained burst
20+
// holds the divert state until the rate falls below threshold within a
21+
// window. Phase 2 consumers must not treat `count` as a global rate.
1722
export type TripDecision =
1823
| { divert: false }
1924
| {

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1342,7 +1342,7 @@ describe("RunEngineTriggerTaskService", () => {
13421342
// engine.trigger ran — Postgres has the run
13431343
expect(result).toBeDefined();
13441344
expect(result?.run.friendlyId).toBeDefined();
1345-
const pgRun = await prisma.taskRun.findUnique({ where: { id: result!.run.id } });
1345+
const pgRun = await prisma.taskRun.findFirst({ where: { id: result!.run.id } });
13461346
expect(pgRun).not.toBeNull();
13471347
expect(pgRun!.friendlyId).toBe(result!.run.friendlyId);
13481348

internal-packages/run-engine/src/engine/systems/enqueueSystem.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,11 @@ export class EnqueueSystem {
8888

8989
const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs;
9090

91-
// Include TTL only when explicitly requested (first enqueue from trigger).
92-
// Re-enqueues (waitpoint, checkpoint, delayed, pending version) must not add TTL.
91+
// Include TTL only when explicitly requested. Callers pass `includeTtl: true`
92+
// on the first enqueue that puts the run into the run queue — the initial
93+
// trigger path, the delayed run release, and the pending-version release.
94+
// Waitpoint/checkpoint re-enqueues must NOT pass it: the run has already
95+
// started, so the queued-but-never-started TTL no longer applies.
9396
let ttlExpiresAt: number | undefined;
9497
if (includeTtl && run.ttl) {
9598
const expireAt = parseNaturalLanguageDuration(run.ttl);

0 commit comments

Comments
 (0)