-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat(mollifier): trigger burst smoothing — Phase 1 (monitoring) #3614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c00b148
ae05184
31aefa1
f4bac2d
cfa6e9e
2dee88c
4f5978a
6c55bf8
74fe441
2afbe12
8469561
d76bb9e
f1efc41
275a5ba
e699034
e734490
0bf53e7
edd3250
1e087e2
7d74b8a
5f06709
f91cbf2
b7e2655
24407fa
adc29fc
cb8a54d
3daee33
2cad05f
2348bf2
5610099
a1a0a85
650f025
5163a65
c31eb22
ed0c468
a467e9e
7344211
673c7d0
60f2fb9
9007053
6487461
02c0b71
ad90fe3
e5d403e
50868ff
ee474b5
92d0841
0d12e7b
f2f4ba6
5255c47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| --- | ||
| "@trigger.dev/redis-worker": patch | ||
| --- | ||
|
|
||
| Add MollifierBuffer and MollifierDrainer primitives for trigger burst smoothing. | ||
|
|
||
| MollifierBuffer (`accept`, `pop`, `ack`, `requeue`, `fail`, `evaluateTrip`) is a per-env FIFO over Redis with atomic Lua transitions for status tracking. `evaluateTrip` is a sliding-window trip evaluator the webapp gate uses to detect per-env trigger bursts. | ||
|
|
||
| MollifierDrainer pops entries through a polling loop with a user-supplied handler. The loop survives transient Redis errors via capped exponential backoff (up to 5s), and per-env pop failures don't poison the rest of the batch — one env's blip is logged and counted as failed for that tick. Rotation is two-level: orgs at the top, envs within each org. The buffer maintains `mollifier:orgs` and `mollifier:org-envs:${orgId}` atomically with per-env queues, so the drainer walks orgs → envs directly without an in-memory cache. The `maxOrgsPerTick` option (default 500) caps how many orgs are scheduled per tick; for each picked org, one env is popped (rotating round-robin within the org). An org with N envs gets the same per-tick scheduling slot as an org with 1 env, so tenant-level drainage throughput is determined by org count rather than env count. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| area: webapp | ||
| type: feature | ||
| --- | ||
|
|
||
| Lay the groundwork for an opt-in burst-protection layer on the trigger hot path. This release ships **monitoring only** — operators can observe per-env trigger storms via two opt-in modes, but no trigger calls are diverted or rate-limited yet (active burst smoothing follows in a later release). All new env vars default off, so existing deployments see no behaviour change. With `MOLLIFIER_SHADOW_MODE=1`, each trigger evaluates a per-env rate counter and logs `mollifier.would_mollify` when the threshold is crossed. With `MOLLIFIER_ENABLED=1` plus a per-org `mollifierEnabled` flag, over-threshold triggers are also recorded in a Redis audit buffer alongside the normal `engine.trigger` call, drained by a background no-op consumer. Emits the `mollifier.decisions` OTel counter for per-env rate visibility. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,18 @@ import type { | |
| TriggerTaskRequest, | ||
| TriggerTaskValidator, | ||
| } from "../types"; | ||
| import { env } from "~/env.server"; | ||
| import { | ||
| evaluateGate as defaultEvaluateGate, | ||
| type GateOutcome, | ||
| type MollifierEvaluateGate, | ||
| } from "~/v3/mollifier/mollifierGate.server"; | ||
| import { | ||
| getMollifierBuffer as defaultGetMollifierBuffer, | ||
| type MollifierGetBuffer, | ||
| } from "~/v3/mollifier/mollifierBuffer.server"; | ||
| import { buildBufferedTriggerPayload } from "~/v3/mollifier/bufferedTriggerPayload.server"; | ||
| import { serialiseSnapshot } from "@trigger.dev/redis-worker"; | ||
| import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server"; | ||
|
|
||
| class NoopTriggerRacepointSystem implements TriggerRacepointSystem { | ||
|
|
@@ -59,6 +71,11 @@ export class RunEngineTriggerTaskService { | |
| private readonly traceEventConcern: TraceEventConcern; | ||
| private readonly triggerRacepointSystem: TriggerRacepointSystem; | ||
| private readonly metadataMaximumSize: number; | ||
| // Mollifier hooks are DI'd so tests can drive the call-site's mollify branch | ||
| // deterministically (stub the gate to return mollify, inject a real or fake | ||
| // buffer). In production both default to the live module-level singletons. | ||
| private readonly evaluateGate: MollifierEvaluateGate; | ||
| private readonly getMollifierBuffer: MollifierGetBuffer; | ||
|
|
||
| constructor(opts: { | ||
| prisma: PrismaClientOrTransaction; | ||
|
|
@@ -71,6 +88,8 @@ export class RunEngineTriggerTaskService { | |
| tracer: Tracer; | ||
| metadataMaximumSize: number; | ||
| triggerRacepointSystem?: TriggerRacepointSystem; | ||
| evaluateGate?: MollifierEvaluateGate; | ||
| getMollifierBuffer?: MollifierGetBuffer; | ||
| }) { | ||
| this.prisma = opts.prisma; | ||
| this.engine = opts.engine; | ||
|
|
@@ -82,6 +101,8 @@ export class RunEngineTriggerTaskService { | |
| this.traceEventConcern = opts.traceEventConcern; | ||
| this.metadataMaximumSize = opts.metadataMaximumSize; | ||
| this.triggerRacepointSystem = opts.triggerRacepointSystem ?? new NoopTriggerRacepointSystem(); | ||
| this.evaluateGate = opts.evaluateGate ?? defaultEvaluateGate; | ||
| this.getMollifierBuffer = opts.getMollifierBuffer ?? defaultGetMollifierBuffer; | ||
| } | ||
|
|
||
| public async call({ | ||
|
|
@@ -316,6 +337,23 @@ export class RunEngineTriggerTaskService { | |
| taskKind: taskKind ?? "STANDARD", | ||
| }; | ||
|
|
||
| // Short-circuit before the gate when mollifier is globally off (the | ||
| // default for every deployment that hasn't opted in). Avoids the | ||
| // GateInputs allocation, the deps spread inside `evaluateGate`, and | ||
| // the `mollifier.decisions{outcome=pass_through}` OTel increment on | ||
| // every trigger — `triggerTask` is the highest-throughput code path | ||
| // in the system. When the flag is on, behaviour is unchanged. | ||
| const mollifierOutcome: GateOutcome | null = | ||
| env.TRIGGER_MOLLIFIER_ENABLED === "1" | ||
| ? await this.evaluateGate({ | ||
| envId: environment.id, | ||
| orgId: environment.organizationId, | ||
| taskId, | ||
| orgFeatureFlags: | ||
| (environment.organization.featureFlags as Record<string, unknown> | null) ?? null, | ||
| }) | ||
| : null; | ||
|
Comment on lines
+346
to
+355
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Hard-coded The short-circuit at This means three mollifier integration tests will fail:
The The DI pattern is correctly wired at construction (lines 91–105) but defeated by the env read at the call siteThe constructor accepts optional Prompt for agentsWas this helpful? React with 👍 or 👎 to provide feedback. |
||
|
|
||
| try { | ||
| return await this.traceEventConcern.traceRun( | ||
| triggerRequest, | ||
|
|
@@ -328,6 +366,74 @@ export class RunEngineTriggerTaskService { | |
|
|
||
| const payloadPacket = await this.payloadProcessor.process(triggerRequest); | ||
|
|
||
| // Phase 1 dual-write: if the org has the mollifier feature flag | ||
| // enabled and the per-env trip evaluator says divert, write the | ||
| // canonical replay payload to the buffer AND continue through | ||
| // engine.trigger as normal. The buffer entry is an audit/preview | ||
| // copy; the drainer's no-op handler consumes it to prove the | ||
| // dequeue mechanism works. Phase 2 will replace engine.trigger | ||
| // (below) with a synthesised 200 response and rely on the | ||
| // drainer to perform the Postgres write via replay. | ||
| if (mollifierOutcome?.action === "mollify") { | ||
| const buffer = this.getMollifierBuffer(); | ||
| if (buffer) { | ||
| const canonicalPayload = buildBufferedTriggerPayload({ | ||
| runFriendlyId, | ||
| taskId, | ||
| envId: environment.id, | ||
| envType: environment.type, | ||
| envSlug: environment.slug, | ||
| orgId: environment.organizationId, | ||
| orgSlug: environment.organization.slug, | ||
| projectId: environment.projectId, | ||
| projectRef: environment.project.externalRef, | ||
| body, | ||
| idempotencyKey: idempotencyKey ?? null, | ||
| idempotencyKeyExpiresAt: idempotencyKey | ||
| ? idempotencyKeyExpiresAt ?? null | ||
| : null, | ||
| tags, | ||
| parentRunFriendlyId: parentRun?.friendlyId ?? null, | ||
| traceContext: event.traceContext, | ||
| triggerSource, | ||
| triggerAction, | ||
| serviceOptions: options, | ||
| createdAt: new Date(), | ||
| }); | ||
|
|
||
| try { | ||
| const serialisedPayload = serialiseSnapshot(canonicalPayload); | ||
| await buffer.accept({ | ||
| runId: runFriendlyId, | ||
| envId: environment.id, | ||
| orgId: environment.organizationId, | ||
| payload: serialisedPayload, | ||
| }); | ||
| // Light log on the hot path — keep this synchronous work | ||
| // O(1) per trigger. The drainer computes the payload hash | ||
| // off-path; operators correlate `mollifier.buffered` → | ||
| // `mollifier.drained` by runId. | ||
| logger.debug("mollifier.buffered", { | ||
| runId: runFriendlyId, | ||
| envId: environment.id, | ||
| orgId: environment.organizationId, | ||
| taskId, | ||
| payloadBytes: serialisedPayload.length, | ||
| }); | ||
| } catch (err) { | ||
| // Fail-open: buffer write must never block the customer's | ||
| // trigger. engine.trigger below is the primary write path | ||
| // in Phase 1 — the customer still gets a valid run. | ||
| logger.error("mollifier.buffer_accept_failed", { | ||
| runId: runFriendlyId, | ||
| envId: environment.id, | ||
| taskId, | ||
| err: err instanceof Error ? err.message : String(err), | ||
| }); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| const taskRun = await this.engine.trigger( | ||
| { | ||
| friendlyId: runFriendlyId, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| import type { TriggerTaskRequestBody } from "@trigger.dev/core/v3"; | ||
| import type { TriggerTaskServiceOptions } from "~/v3/services/triggerTask.server"; | ||
|
|
||
| // Canonical payload shape written to the mollifier buffer when the gate | ||
| // decides to mollify a trigger. Phase 1 ALSO calls engine.trigger directly | ||
| // (dual-write) so this is currently an audit/preview record. Phase 2 will | ||
| // make the buffer the primary write path: the drainer's handler will read | ||
| // this payload and replay it through engine.trigger to create the run in | ||
| // Postgres, and read-fallback endpoints will synthesise a Run view from it | ||
| // while it is still QUEUED. | ||
| // | ||
| // CONTRACT: this shape must contain everything needed for Phase 2's | ||
| // drainer-replay to reconstruct an equivalent engine.trigger call. Phase 1 | ||
| // emits it to logs; Phase 2 will serialise it into Redis and rebuild it on | ||
| // the drain side. Keep it serialisable — no functions, no class instances. | ||
| export type BufferedTriggerPayload = { | ||
| runFriendlyId: string; | ||
|
|
||
| // Routing identifiers — let the drainer re-fetch full AuthenticatedEnvironment | ||
| // at replay time rather than embedding it in the payload. | ||
| envId: string; | ||
| envType: string; | ||
| envSlug: string; | ||
| orgId: string; | ||
| orgSlug: string; | ||
| projectId: string; | ||
| projectRef: string; | ||
|
|
||
| // Task identifier — looked up against the locked BackgroundWorkerTask | ||
| // at replay time to recover task-defaults. | ||
| taskId: string; | ||
|
|
||
| // Customer-supplied trigger body (payload, options, context). | ||
| body: TriggerTaskRequestBody; | ||
|
|
||
| // Resolved values from upstream concerns. The drainer should NOT re-resolve | ||
| // these — that would create a second idempotency-key check, etc. | ||
| idempotencyKey: string | null; | ||
| idempotencyKeyExpiresAt: string | null; | ||
| tags: string[]; | ||
|
|
||
| // Parent/root linkage for nested triggers. | ||
| parentRunFriendlyId: string | null; | ||
|
|
||
| // Trace context — propagates the original triggering span across the | ||
| // buffer→drain boundary so the run's lifecycle stays under one trace. | ||
| traceContext: Record<string, unknown>; | ||
|
|
||
| // Annotations + service options that influence routing/replay. | ||
| triggerSource: string; | ||
| triggerAction: string; | ||
| serviceOptions: TriggerTaskServiceOptions; | ||
|
|
||
| // Wall-clock instants relevant to the run. | ||
| createdAt: string; | ||
| }; | ||
|
|
||
| // Assemble the canonical payload from the inputs available at the point | ||
| // `evaluateGate` returns "mollify" in `RunEngineTriggerTaskService.call`. | ||
| // All fields must be derivable from data already in scope at that call site; | ||
| // nothing should require an extra DB lookup. | ||
| export function buildBufferedTriggerPayload(input: { | ||
| runFriendlyId: string; | ||
| taskId: string; | ||
| envId: string; | ||
| envType: string; | ||
| envSlug: string; | ||
| orgId: string; | ||
| orgSlug: string; | ||
| projectId: string; | ||
| projectRef: string; | ||
| body: TriggerTaskRequestBody; | ||
| idempotencyKey: string | null; | ||
| idempotencyKeyExpiresAt: Date | null; | ||
| tags: string[]; | ||
| parentRunFriendlyId: string | null; | ||
| traceContext: Record<string, unknown>; | ||
| triggerSource: string; | ||
| triggerAction: string; | ||
| serviceOptions: TriggerTaskServiceOptions; | ||
| createdAt: Date; | ||
| }): BufferedTriggerPayload { | ||
| return { | ||
| runFriendlyId: input.runFriendlyId, | ||
| envId: input.envId, | ||
| envType: input.envType, | ||
| envSlug: input.envSlug, | ||
| orgId: input.orgId, | ||
| orgSlug: input.orgSlug, | ||
| projectId: input.projectId, | ||
| projectRef: input.projectRef, | ||
| taskId: input.taskId, | ||
| body: input.body, | ||
| idempotencyKey: input.idempotencyKey, | ||
| idempotencyKeyExpiresAt: | ||
| input.idempotencyKey && input.idempotencyKeyExpiresAt | ||
| ? input.idempotencyKeyExpiresAt.toISOString() | ||
| : null, | ||
| tags: input.tags, | ||
| parentRunFriendlyId: input.parentRunFriendlyId, | ||
| traceContext: input.traceContext, | ||
| triggerSource: input.triggerSource, | ||
| triggerAction: input.triggerAction, | ||
| serviceOptions: input.serviceOptions, | ||
| createdAt: input.createdAt.toISOString(), | ||
| }; | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.