Skip to content

Commit 2fa84ea

Browse files
authored
feat(webapp): gate worker dequeues by worker queue via env var (#4030)
## Summary Adds a `RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES` setting that refuses worker dequeue requests for the listed worker queues (or base regions), so their runs stay queued instead of being handed to workers that can't run them. Blocked dequeues are counted via a `run_engine.dequeue.blocked` OTel counter (labeled by `worker_queue` and `region`).
1 parent 8890d7a commit 2fa84ea

6 files changed

Lines changed: 114 additions & 3 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Add a `RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES` setting that refuses worker dequeue requests for the listed worker queues (or base regions), so their runs stay queued instead of being handed to workers that can't run them.

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,7 @@ const EnvironmentSchema = z
827827
RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000),
828828
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
829829
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
830+
RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES: z.string().optional(),
830831
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(1000),
831832
// Off by default. Enable on a single service (e.g. the engine worker) so only one
832833
// instance reports worker queue length, rather than every replica.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { getMeter } from "@internal/tracing";
2+
import { env } from "~/env.server";
3+
import {
4+
baseWorkerQueue,
5+
matchesDisabledWorkerQueue,
6+
parseDisabledWorkerQueues,
7+
} from "./workerQueueSplit.server";
8+
9+
const meter = getMeter("run-engine-dequeue-gate");
10+
11+
const blockedDequeueCounter = meter.createCounter("run_engine.dequeue.blocked", {
12+
description:
13+
"Count of worker dequeue requests refused because the worker queue is gated off via RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES",
14+
});
15+
16+
const disabledWorkerQueues = parseDisabledWorkerQueues(
17+
env.RUN_ENGINE_DEQUEUE_DISABLED_WORKER_QUEUES
18+
);
19+
20+
export function isWorkerQueueDequeueDisabled(workerQueue: string): boolean {
21+
return matchesDisabledWorkerQueue(workerQueue, disabledWorkerQueues);
22+
}
23+
24+
export function recordBlockedDequeue(workerQueue: string): void {
25+
blockedDequeueCounter.add(1, {
26+
worker_queue: workerQueue,
27+
region: baseWorkerQueue(workerQueue),
28+
});
29+
}

apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,25 @@ export function workerQueueForClass(
122122

123123
return masterQueue;
124124
}
125+
126+
export function parseDisabledWorkerQueues(raw: string | undefined): Set<string> {
127+
return new Set(
128+
(raw ?? "")
129+
.split(",")
130+
.map((entry) => entry.trim())
131+
.filter(Boolean)
132+
);
133+
}
134+
135+
export function matchesDisabledWorkerQueue(
136+
workerQueue: string,
137+
disabledWorkerQueues: ReadonlySet<string>
138+
): boolean {
139+
if (disabledWorkerQueues.size === 0) {
140+
return false;
141+
}
142+
143+
return (
144+
disabledWorkerQueues.has(workerQueue) || disabledWorkerQueues.has(baseWorkerQueue(workerQueue))
145+
);
146+
}

apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ import { singleton } from "~/utils/singleton";
2828
import { resolveVariablesForEnvironment } from "~/v3/environmentVariables/environmentVariablesRepository.server";
2929
import { machinePresetFromName } from "~/v3/machinePresets.server";
3030
import { workerQueueForClass } from "~/runEngine/concerns/workerQueueSplit.server";
31+
import {
32+
isWorkerQueueDequeueDisabled,
33+
recordBlockedDequeue,
34+
} from "~/runEngine/concerns/dequeueGate.server";
3135
import { WithRunEngine, WithRunEngineOptions } from "../baseService.server";
3236

3337
const authenticatedWorkerInstanceCache = singleton(
@@ -377,11 +381,16 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
377381
runnerId?: string;
378382
queueClass?: WorkerQueueClass;
379383
}): Promise<DequeuedMessage[]> {
380-
// Derive the actual queue from this worker's own masterQueue + class, so a
381-
// token can only ever reach its own region's queues (default or :scheduled).
384+
const workerQueue = workerQueueForClass(this.masterQueue, queueClass);
385+
386+
if (isWorkerQueueDequeueDisabled(workerQueue)) {
387+
recordBlockedDequeue(workerQueue);
388+
return [];
389+
}
390+
382391
return await this._engine.dequeueFromWorkerQueue({
383392
consumerId: this.workerInstanceId,
384-
workerQueue: workerQueueForClass(this.masterQueue, queueClass),
393+
workerQueue,
385394
workerId: this.workerInstanceId,
386395
runnerId,
387396
});
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import { describe, expect, it } from "vitest";
2+
import {
3+
matchesDisabledWorkerQueue,
4+
parseDisabledWorkerQueues,
5+
} from "~/runEngine/concerns/workerQueueSplit.server";
6+
7+
describe("parseDisabledWorkerQueues", () => {
8+
it("returns an empty set for undefined or empty input", () => {
9+
expect(parseDisabledWorkerQueues(undefined).size).toBe(0);
10+
expect(parseDisabledWorkerQueues("").size).toBe(0);
11+
expect(parseDisabledWorkerQueues(" , ,").size).toBe(0);
12+
});
13+
14+
it("splits, trims, and drops empties", () => {
15+
const parsed = parseDisabledWorkerQueues(" eu-central-1 , us-east-1:scheduled ,, ");
16+
expect([...parsed]).toEqual(["eu-central-1", "us-east-1:scheduled"]);
17+
});
18+
});
19+
20+
describe("matchesDisabledWorkerQueue", () => {
21+
it("never matches when the disabled set is empty", () => {
22+
const empty = parseDisabledWorkerQueues(undefined);
23+
expect(matchesDisabledWorkerQueue("eu-central-1", empty)).toBe(false);
24+
expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", empty)).toBe(false);
25+
});
26+
27+
it("gates the base region and its scheduled split when the base region is listed", () => {
28+
const disabled = parseDisabledWorkerQueues("eu-central-1");
29+
expect(matchesDisabledWorkerQueue("eu-central-1", disabled)).toBe(true);
30+
expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", disabled)).toBe(true);
31+
});
32+
33+
it("leaves other regions alone", () => {
34+
const disabled = parseDisabledWorkerQueues("eu-central-1");
35+
expect(matchesDisabledWorkerQueue("us-east-1", disabled)).toBe(false);
36+
expect(matchesDisabledWorkerQueue("us-east-1:scheduled", disabled)).toBe(false);
37+
});
38+
39+
it("gates only the scheduled split when a full worker queue is listed", () => {
40+
const disabled = parseDisabledWorkerQueues("eu-central-1:scheduled");
41+
expect(matchesDisabledWorkerQueue("eu-central-1:scheduled", disabled)).toBe(true);
42+
expect(matchesDisabledWorkerQueue("eu-central-1", disabled)).toBe(false);
43+
});
44+
});

0 commit comments

Comments
 (0)