From a0fc0f3d0b6cc842317380f31e9e439aeaf68b5e Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Tue, 23 Jun 2026 10:17:29 +0100 Subject: [PATCH] feat(webapp): gate worker dequeues by worker queue via env var Add RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES: a comma-separated list of worker queues (or base regions, which also cover the :scheduled split) for which the engine API refuses worker dequeue requests and returns no work, so those runs stay queued instead of being handed to workers that cannot run them. Unset means no gating. Blocked dequeues increment the run_engine.dequeue.blocked otel counter, tagged by worker_queue and region. --- .server-changes/dequeue-region-gate.md | 6 +++ apps/webapp/app/env.server.ts | 1 + .../runEngine/concerns/dequeueGate.server.ts | 29 ++++++++++++ .../concerns/workerQueueSplit.server.ts | 22 ++++++++++ .../worker/workerGroupTokenService.server.ts | 15 +++++-- .../test/workerQueueSplit.server.test.ts | 44 +++++++++++++++++++ 6 files changed, 114 insertions(+), 3 deletions(-) create mode 100644 .server-changes/dequeue-region-gate.md create mode 100644 apps/webapp/app/runEngine/concerns/dequeueGate.server.ts create mode 100644 apps/webapp/test/workerQueueSplit.server.test.ts diff --git a/.server-changes/dequeue-region-gate.md b/.server-changes/dequeue-region-gate.md new file mode 100644 index 00000000000..d4f9d6979c4 --- /dev/null +++ b/.server-changes/dequeue-region-gate.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Add a `RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES` setting that refuses worker dequeue requests for the listed worker queues (or base regions), so their runs stay queued instead of being handed to workers that can't run them. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index f793654dd3a..f2c11aba415 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -809,6 +809,7 @@ const EnvironmentSchema = z RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000), RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200), RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10), + RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES: z.string().optional(), RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(1000), RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(10_000), RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10), diff --git a/apps/webapp/app/runEngine/concerns/dequeueGate.server.ts b/apps/webapp/app/runEngine/concerns/dequeueGate.server.ts new file mode 100644 index 00000000000..1068cd4c19d --- /dev/null +++ b/apps/webapp/app/runEngine/concerns/dequeueGate.server.ts @@ -0,0 +1,29 @@ +import { getMeter } from "@internal/tracing"; +import { env } from "~/env.server"; +import { + baseWorkerQueue, + matchesDisabledWorkerQueue, + parseDisabledWorkerQueues, +} from "./workerQueueSplit.server"; + +const meter = getMeter("run-engine-dequeue-gate"); + +const blockedDequeueCounter = meter.createCounter("run_engine.dequeue.blocked", { + description: + "Count of worker dequeue requests refused because the worker queue is gated off via RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES", +}); + +const disabledWorkerQueues = parseDisabledWorkerQueues( + env.RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES +); + +export function isWorkerQueueDequeueDisabled(workerQueue: string): boolean { + return matchesDisabledWorkerQueue(workerQueue, disabledWorkerQueues); +} + +export function recordBlockedDequeue(workerQueue: string): void { + blockedDequeueCounter.add(1, { + worker_queue: workerQueue, + region: baseWorkerQueue(workerQueue), + }); +} diff --git a/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts b/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts index a2bf5df87b3..c309e79e02e 100644 --- a/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts +++ b/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts @@ -122,3 +122,25 @@ export function workerQueueForClass( return masterQueue; } + +export function parseDisabledWorkerQueues(raw: string | undefined): Set { + return new Set( + (raw ?? "") + .split(",") + .map((entry) => entry.trim()) + .filter(Boolean) + ); +} + +export function matchesDisabledWorkerQueue( + workerQueue: string, + disabledWorkerQueues: ReadonlySet +): boolean { + if (disabledWorkerQueues.size === 0) { + return false; + } + + return ( + disabledWorkerQueues.has(workerQueue) || disabledWorkerQueues.has(baseWorkerQueue(workerQueue)) + ); +} diff --git a/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts b/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts index cb4a90cc597..7f52e32d5c3 100644 --- a/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts +++ b/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts @@ -28,6 +28,10 @@ import { singleton } from "~/utils/singleton"; import { resolveVariablesForEnvironment } from "~/v3/environmentVariables/environmentVariablesRepository.server"; import { machinePresetFromName } from "~/v3/machinePresets.server"; import { workerQueueForClass } from "~/runEngine/concerns/workerQueueSplit.server"; +import { + isWorkerQueueDequeueDisabled, + recordBlockedDequeue, +} from "~/runEngine/concerns/dequeueGate.server"; import { WithRunEngine, WithRunEngineOptions } from "../baseService.server"; const authenticatedWorkerInstanceCache = singleton( @@ -377,11 +381,16 @@ export class AuthenticatedWorkerInstance extends WithRunEngine { runnerId?: string; queueClass?: WorkerQueueClass; }): Promise { - // Derive the actual queue from this worker's own masterQueue + class, so a - // token can only ever reach its own region's queues (default or :scheduled). + const workerQueue = workerQueueForClass(this.masterQueue, queueClass); + + if (isWorkerQueueDequeueDisabled(workerQueue)) { + recordBlockedDequeue(workerQueue); + return []; + } + return await this._engine.dequeueFromWorkerQueue({ consumerId: this.workerInstanceId, - workerQueue: workerQueueForClass(this.masterQueue, queueClass), + workerQueue, workerId: this.workerInstanceId, runnerId, }); diff --git a/apps/webapp/test/workerQueueSplit.server.test.ts b/apps/webapp/test/workerQueueSplit.server.test.ts new file mode 100644 index 00000000000..c8182b83105 --- /dev/null +++ b/apps/webapp/test/workerQueueSplit.server.test.ts @@ -0,0 +1,44 @@ +import { describe, expect, it } from "vitest"; +import { + matchesDisabledWorkerQueue, + parseDisabledWorkerQueues, +} from "~/runEngine/concerns/workerQueueSplit.server"; + +describe("parseDisabledWorkerQueues", () => { + it("returns an empty set for undefined or empty input", () => { + expect(parseDisabledWorkerQueues(undefined).size).toBe(0); + expect(parseDisabledWorkerQueues("").size).toBe(0); + expect(parseDisabledWorkerQueues(" , ,").size).toBe(0); + }); + + it("splits, trims, and drops empties", () => { + const parsed = parseDisabledWorkerQueues(" eu-central-1 , us-east-1:scheduled ,, "); + expect([...parsed]).toEqual(["eu-central-1", "us-east-1:scheduled"]); + }); +}); + +describe("matchesDisabledWorkerQueue", () => { + it("never matches when the disabled set is empty", () => { + const empty = parseDisabledWorkerQueues(undefined); + expect(matchesDisabledWorkerQueue("eu-central-1", empty)).toBe(false); + expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", empty)).toBe(false); + }); + + it("gates the base region and its scheduled split when the base region is listed", () => { + const disabled = parseDisabledWorkerQueues("eu-central-1"); + expect(matchesDisabledWorkerQueue("eu-central-1", disabled)).toBe(true); + expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", disabled)).toBe(true); + }); + + it("leaves other regions alone", () => { + const disabled = parseDisabledWorkerQueues("eu-central-1"); + expect(matchesDisabledWorkerQueue("us-east-1", disabled)).toBe(false); + expect(matchesDisabledWorkerQueue("us-east-1:scheduled", disabled)).toBe(false); + }); + + it("gates only the scheduled split when a full worker queue is listed", () => { + const disabled = parseDisabledWorkerQueues("eu-central-1:scheduled"); + expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", disabled)).toBe(true); + expect(matchesDisabledWorkerQueue("eu-central-1", disabled)).toBe(false); + }); +});