Skip to content

Commit 2c82d4c

Browse files
authored
feat(supervisor): add cluster pod-count dequeue backpressure source (#4027)
Adds an in-process backpressure signal that pauses dequeuing when the Kubernetes cluster is saturated, so work overflows cheaply in the queue instead of piling up as unschedulable pods. Saturation is read by scraping the apiserver's total pod-object count (`apiserver_storage_objects{resource="pods"}`) and applying an engage/release threshold with hysteresis - a single lightweight aggregate scrape, not a pod listing. Backpressure sources are now evaluated independently and OR'd: each source has its own enable and dry-run flag, and the supervisor engages if any enabled source trips. This adds the pod-count source alongside the existing one without changing it, and is extensible to more sources later. Off by default. The scrape uses the in-cluster kubeconfig over `https` so TLS verifies against the cluster CA (the fetch-options helper attaches the CA as an `https.Agent`, which the global `fetch` ignores - that path silently dropped the CA). Enabling the pod-count source requires the supervisor's service account to be granted `get` on the `/metrics` non-resource URL; that RBAC and the per-deployment env wiring are operator-side and live elsewhere. New config (pod-count source): `TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED` (default false), `_POD_COUNT_DRY_RUN` (default true), `_POD_COUNT_ENGAGE` / `_POD_COUNT_RELEASE` (hysteresis thresholds), `_POD_COUNT_REFRESH_MS` (scrape interval, default 5s). The existing source's flags are unchanged. Observability: a `supervisor_cluster_pod_count` gauge, and the pod-count monitor's metrics are namespaced (`supervisor_backpressure_pod_count_*`) so the existing backpressure metrics keep their names.
1 parent 5667461 commit 2c82d4c

7 files changed

Lines changed: 352 additions & 26 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: supervisor
3+
type: feature
4+
---
5+
6+
The supervisor can pause dequeuing when the Kubernetes cluster is saturated, based on the cluster's total pod count. Opt-in and off by default.
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import { describe, it, expect } from "vitest";
2+
import { parsePodCount, K8sPodCountSignalSource } from "./k8sPodCountSignalSource.js";
3+
4+
describe("parsePodCount", () => {
5+
it("reads the pods object count", () => {
6+
const text = [
7+
"# HELP apiserver_storage_objects Number of stored objects",
8+
"# TYPE apiserver_storage_objects gauge",
9+
'apiserver_storage_objects{resource="pods"} 8421',
10+
'apiserver_storage_objects{resource="configmaps"} 17',
11+
].join("\n");
12+
expect(parsePodCount(text)).toBe(8421);
13+
});
14+
15+
it("is tolerant of extra labels in any order", () => {
16+
const text = 'apiserver_storage_objects{group="",resource="pods",extra="x"} 12';
17+
expect(parsePodCount(text)).toBe(12);
18+
});
19+
20+
it("parses scientific notation", () => {
21+
const text = 'apiserver_storage_objects{resource="pods"} 1.2e+04';
22+
expect(parsePodCount(text)).toBe(12000);
23+
});
24+
25+
it("throws when the pods metric is absent", () => {
26+
const text = 'apiserver_storage_objects{resource="configmaps"} 17';
27+
expect(() => parsePodCount(text)).toThrow(/not found/);
28+
});
29+
30+
it("throws on a non-finite value (e.g. 1e999)", () => {
31+
const text = 'apiserver_storage_objects{resource="pods"} 1e999';
32+
expect(() => parsePodCount(text)).toThrow();
33+
});
34+
35+
it("throws on a negative value", () => {
36+
const text = 'apiserver_storage_objects{resource="pods"} -5';
37+
expect(() => parsePodCount(text)).toThrow();
38+
});
39+
});
40+
41+
function metrics(count: number): string {
42+
return `apiserver_storage_objects{resource="pods"} ${count}`;
43+
}
44+
45+
describe("K8sPodCountSignalSource", () => {
46+
it("engages at the engage threshold and reports the count", async () => {
47+
const counts: number[] = [];
48+
const source = new K8sPodCountSignalSource({
49+
fetchMetrics: async () => metrics(10000),
50+
engageThreshold: 10000,
51+
releaseThreshold: 5000,
52+
reportPodCount: (c) => counts.push(c),
53+
});
54+
const verdict = await source.read();
55+
expect(verdict.engaged).toBe(true);
56+
expect(typeof verdict.ts).toBe("number");
57+
expect(counts).toEqual([10000]);
58+
});
59+
60+
it("does not engage below the engage threshold", async () => {
61+
const source = new K8sPodCountSignalSource({
62+
fetchMetrics: async () => metrics(9999),
63+
engageThreshold: 10000,
64+
releaseThreshold: 5000,
65+
});
66+
expect((await source.read()).engaged).toBe(false);
67+
});
68+
69+
it("stays engaged in the hysteresis band, releases only below release threshold", async () => {
70+
let count = 10000;
71+
const source = new K8sPodCountSignalSource({
72+
fetchMetrics: async () => metrics(count),
73+
engageThreshold: 10000,
74+
releaseThreshold: 5000,
75+
});
76+
expect((await source.read()).engaged).toBe(true); // engage
77+
count = 7000;
78+
expect((await source.read()).engaged).toBe(true); // band -> still engaged
79+
count = 4999;
80+
expect((await source.read()).engaged).toBe(false); // below release -> off
81+
count = 7000;
82+
expect((await source.read()).engaged).toBe(false); // band again -> stays off
83+
});
84+
85+
it("propagates scrape failures (monitor fails open on throw)", async () => {
86+
const source = new K8sPodCountSignalSource({
87+
fetchMetrics: async () => {
88+
throw new Error("connection refused");
89+
},
90+
engageThreshold: 10000,
91+
releaseThreshold: 5000,
92+
});
93+
await expect(source.read()).rejects.toThrow("connection refused");
94+
});
95+
});
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import type { BackpressureSignalSource, BackpressureVerdict } from "./backpressureMonitor.js";
2+
3+
// Reads the apiserver's stored-pod-object count from a Prometheus /metrics scrape.
4+
const POD_COUNT_RE = /^apiserver_storage_objects\{[^}]*resource="pods"[^}]*\}\s+([0-9.eE+]+)/m;
5+
6+
export function parsePodCount(metricsText: string): number {
7+
const match = metricsText.match(POD_COUNT_RE);
8+
if (!match) {
9+
throw new Error('apiserver_storage_objects{resource="pods"} not found in metrics');
10+
}
11+
const value = Number(match[1]);
12+
if (!Number.isFinite(value)) {
13+
throw new Error(`unparseable pod count: ${match[1]}`);
14+
}
15+
return value;
16+
}
17+
18+
export type K8sPodCountSignalSourceOptions = {
19+
fetchMetrics: () => Promise<string>;
20+
engageThreshold: number;
21+
releaseThreshold: number;
22+
reportPodCount?: (count: number) => void;
23+
};
24+
25+
// Engage/release with hysteresis so a count hovering near the line doesn't flap.
26+
export class K8sPodCountSignalSource implements BackpressureSignalSource {
27+
private engaged = false;
28+
29+
constructor(private readonly opts: K8sPodCountSignalSourceOptions) {}
30+
31+
async read(): Promise<BackpressureVerdict> {
32+
const text = await this.opts.fetchMetrics();
33+
const count = parsePodCount(text);
34+
this.opts.reportPodCount?.(count);
35+
36+
if (this.engaged) {
37+
if (count < this.opts.releaseThreshold) {
38+
this.engaged = false;
39+
}
40+
} else if (count >= this.opts.engageThreshold) {
41+
this.engaged = true;
42+
}
43+
44+
return { engaged: this.engaged, ts: Date.now() };
45+
}
46+
}

apps/supervisor/src/clients/kubernetes.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import * as https from "node:https";
12
import * as k8s from "@kubernetes/client-node";
23
import { Informer } from "@kubernetes/client-node";
34
import { ListPromise } from "@kubernetes/client-node";
@@ -53,3 +54,57 @@ function getKubeConfig() {
5354
}
5455

5556
export { k8s };
57+
58+
/**
59+
* Builds a function that scrapes the apiserver's Prometheus /metrics endpoint.
60+
* One lightweight aggregate read - not a pod listing. Requires the service
61+
* account to be granted GET on the /metrics non-resource URL.
62+
*/
63+
export function createApiserverMetricsFetcher(timeoutMs: number): () => Promise<string> {
64+
const kubeConfig = getKubeConfig();
65+
66+
return async () => {
67+
const cluster = kubeConfig.getCurrentCluster();
68+
if (!cluster) {
69+
throw new Error("no current cluster in kubeconfig");
70+
}
71+
const url = new URL(`${cluster.server}/metrics`);
72+
const opts: https.RequestOptions = {
73+
method: "GET",
74+
protocol: url.protocol,
75+
hostname: url.hostname,
76+
port: url.port,
77+
path: url.pathname,
78+
};
79+
// applyToHTTPSOptions sets the cluster CA, client cert/key, and auth headers
80+
// (incl. exec plugins) on the request - so TLS verifies against the cluster
81+
// CA, not the system store. The fetch-options path attaches the CA as an
82+
// https.Agent, which global fetch (undici) ignores.
83+
await kubeConfig.applyToHTTPSOptions(opts);
84+
85+
return new Promise<string>((resolve, reject) => {
86+
const req = https.request(opts, (res) => {
87+
const status = res.statusCode ?? 0;
88+
let body = "";
89+
res.setEncoding("utf8");
90+
res.on("data", (chunk) => {
91+
body += chunk;
92+
});
93+
res.on("end", () => {
94+
if (status >= 200 && status < 300) {
95+
resolve(body);
96+
} else {
97+
reject(new Error(`apiserver /metrics scrape failed: ${status}`));
98+
}
99+
});
100+
});
101+
// Without this a hung connect/TLS/read never settles, and the monitor's
102+
// refreshInFlight guard would freeze the source (silent fail-open).
103+
req.setTimeout(timeoutMs, () => {
104+
req.destroy(new Error(`apiserver /metrics scrape timed out after ${timeoutMs}ms`));
105+
});
106+
req.on("error", reject);
107+
req.end();
108+
});
109+
};
110+
}

apps/supervisor/src/env.test.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import { describe, it, expect, vi } from "vitest";
2+
3+
// Mock std-env before importing env.ts so the module-level `Env.parse(stdEnv)`
4+
// doesn't fail in a test environment that lacks required vars.
5+
vi.mock("std-env", () => ({
6+
env: {
7+
TRIGGER_API_URL: "http://localhost:3030",
8+
TRIGGER_WORKER_TOKEN: "test-token",
9+
MANAGED_WORKER_SECRET: "test-secret",
10+
OTEL_EXPORTER_OTLP_ENDPOINT: "http://localhost:4318",
11+
},
12+
}));
13+
14+
const { Env } = await import("./env.js");
15+
16+
// Minimal env that satisfies all required fields; everything else has defaults.
17+
const base = {
18+
TRIGGER_API_URL: "http://localhost:3030",
19+
TRIGGER_WORKER_TOKEN: "test-token",
20+
MANAGED_WORKER_SECRET: "test-secret",
21+
OTEL_EXPORTER_OTLP_ENDPOINT: "http://localhost:4318",
22+
};
23+
24+
describe("Env superRefine - backpressure source awareness", () => {
25+
it("pod-count source can be enabled without a Redis host", () => {
26+
expect(() =>
27+
Env.parse({
28+
...base,
29+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED: "true",
30+
})
31+
).not.toThrow();
32+
});
33+
34+
it("redis source requires a Redis host", () => {
35+
expect(() =>
36+
Env.parse({
37+
...base,
38+
TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: "true",
39+
})
40+
).toThrow();
41+
});
42+
43+
it("both sources can be enabled together (with a Redis host)", () => {
44+
expect(() =>
45+
Env.parse({
46+
...base,
47+
TRIGGER_DEQUEUE_BACKPRESSURE_ENABLED: "true",
48+
TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_HOST: "localhost",
49+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED: "true",
50+
})
51+
).not.toThrow();
52+
});
53+
54+
it("rejects pod-count release >= engage when the source is enabled", () => {
55+
expect(() =>
56+
Env.parse({
57+
...base,
58+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED: "true",
59+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE: "100",
60+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE: "100",
61+
})
62+
).toThrow();
63+
});
64+
});

apps/supervisor/src/env.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { env as stdEnv } from "std-env";
33
import { z } from "zod";
44
import { AdditionalEnvVars, BoolEnv } from "./envUtil.js";
55

6-
const Env = z
6+
export const Env = z
77
.object({
88
// This will come from `spec.nodeName` in k8s
99
TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()),
@@ -73,6 +73,18 @@ const Env = z
7373
TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_USERNAME: z.string().optional(),
7474
TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_PASSWORD: z.string().optional(),
7575
TRIGGER_DEQUEUE_BACKPRESSURE_REDIS_TLS_DISABLED: BoolEnv.default(false),
76+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED: BoolEnv.default(false),
77+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_DRY_RUN: BoolEnv.default(true),
78+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE: z.coerce.number().int().positive().default(10_000),
79+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE: z.coerce.number().int().positive().default(5_000),
80+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_REFRESH_MS: z.coerce.number().int().positive().default(5_000),
81+
// Hard timeout on the apiserver /metrics scrape. A hung request would otherwise
82+
// never settle and freeze the monitor's refresh loop (fail-open silently).
83+
TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_SCRAPE_TIMEOUT_MS: z.coerce
84+
.number()
85+
.int()
86+
.positive()
87+
.default(10_000),
7688

7789
// Optional services
7890
TRIGGER_WARM_START_URL: z.string().optional(),
@@ -312,6 +324,18 @@ const Env = z
312324
TRIGGER_WIDE_EVENTS_NOISY_ROUTES: BoolEnv.default(false),
313325
})
314326
.superRefine((data, ctx) => {
327+
if (
328+
data.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENABLED &&
329+
data.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE >=
330+
data.TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE
331+
) {
332+
ctx.addIssue({
333+
code: z.ZodIssueCode.custom,
334+
message:
335+
"TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE must be less than TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_ENGAGE",
336+
path: ["TRIGGER_DEQUEUE_BACKPRESSURE_POD_COUNT_RELEASE"],
337+
});
338+
}
315339
if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_METADATA_URL) {
316340
ctx.addIssue({
317341
code: z.ZodIssueCode.custom,

0 commit comments

Comments
 (0)