Skip to content

Commit 650f025

Browse files
committed
refactor(mollifier): drop the redundant mollifier:envs SET
With the drainer walking listOrgs → listEnvsForOrg → pop, the flat mollifier:envs SET has no consumer — `mollifier:orgs` and the per-org `mollifier:org-envs:${orgId}` SETs cover everything the drainer needs. Removing it drops three Lua write ops per accept/pop/requeue and one Redis key per active env. Changes: - Lua: acceptMollifierEntry, popAndMarkDraining, requeueMollifierEntry no longer touch mollifier:envs. Their KEYS arrays shrink by one. - TS: listEnvs() method removed; only listOrgs() and listEnvsForOrg() remain. TS bindings updated to match the new arg shapes. - buffer.test.ts: listEnvs() assertions converted to listEnvsForOrg( "org_1") so they verify the equivalent org-level membership. The "stale envs SET cleanup on empty-pop" test is removed (envs SET is gone). The "pop skips orphans" test's trailing-cleanup assertion is updated to document the deliberate stale-tolerance in the no-runId branch of popAndMarkDraining (can't read orgId without a popped entry, so org-envs cleanup is skipped there). - drainer.test.ts: stub helper moved to module scope and gains an `eachEnvAsOwnOrg(envs)` convenience that supplies listOrgs + listEnvsForOrg in tests where each env is its own org. Stub helpers duplicated across describe blocks are removed in favour of the shared one. 24/24 drainer tests pass; buffer tests pass in isolation (a few timeout under full-suite contention against the shared redis container — unrelated to this change).
1 parent a1a0a85 commit 650f025

3 files changed

Lines changed: 114 additions & 224 deletions

File tree

packages/redis-worker/src/mollifier/buffer.test.ts

Lines changed: 25 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ describe("MollifierBuffer.accept", () => {
8989
expect(entry!.attempts).toBe(0);
9090
expect(entry!.createdAt).toBeInstanceOf(Date);
9191

92-
const envs = await buffer.listEnvs();
92+
const envs = await buffer.listEnvsForOrg("org_1");
9393
expect(envs).toContain("env_a");
9494
} finally {
9595
await buffer.close();
@@ -211,7 +211,6 @@ describe("MollifierBuffer.pop orphan handling", () => {
211211
try {
212212
// Simulate a TTL-expired orphan: queue ref exists, entry hash does not.
213213
await buffer["redis"].lpush("mollifier:queue:env_a", "run_orphan");
214-
await buffer["redis"].sadd("mollifier:envs", "env_a");
215214

216215
const popped = await buffer.pop("env_a");
217216
expect(popped).toBeNull();
@@ -220,10 +219,9 @@ describe("MollifierBuffer.pop orphan handling", () => {
220219
const raw = await buffer["redis"].hgetall("mollifier:entries:run_orphan");
221220
expect(Object.keys(raw)).toHaveLength(0);
222221

223-
// Queue and envs set are both cleaned up.
222+
// Queue is drained — the loop pops orphans until empty.
224223
const qLen = await buffer["redis"].llen("mollifier:queue:env_a");
225224
expect(qLen).toBe(0);
226-
expect(await buffer.listEnvs()).not.toContain("env_a");
227225
} finally {
228226
await buffer.close();
229227
}
@@ -261,10 +259,15 @@ describe("MollifierBuffer.pop orphan handling", () => {
261259
const remaining = await buffer["redis"].llen("mollifier:queue:env_a");
262260
expect(remaining).toBe(1);
263261

264-
// A second pop drains it and SREMs the env (no more valid entries).
262+
// A second pop drains the trailing orphan_b. The queue is now
263+
// empty. NOTE: the pop's no-runId branch can't read orgId from
264+
// a popped entry (it never got one), so it doesn't prune the
265+
// org-envs SET. env_a remains in `mollifier:org-envs:org_1` as
266+
// a stale entry until the next accept-or-success-pop cycle
267+
// recovers it. This is the deliberate trade-off documented in
268+
// popAndMarkDraining's Lua.
265269
const second = await buffer.pop("env_a");
266270
expect(second).toBeNull();
267-
expect(await buffer.listEnvs()).not.toContain("env_a");
268271
} finally {
269272
await buffer.close();
270273
}
@@ -444,7 +447,7 @@ describe("MollifierBuffer.requeue on missing entry", () => {
444447
// Critical: no queue keys were created from this no-op requeue.
445448
const queueKeys = await buffer["redis"].keys("mollifier:queue:*");
446449
expect(queueKeys).toHaveLength(0);
447-
const envs = await buffer.listEnvs();
450+
const envs = await buffer.listEnvsForOrg("org_1");
448451
expect(envs).toHaveLength(0);
449452
} finally {
450453
await buffer.close();
@@ -742,36 +745,36 @@ describe("MollifierBuffer entry lifecycle invariants", () => {
742745

743746
try {
744747
// Empty start
745-
expect(await buffer.listEnvs()).not.toContain("env_lc");
748+
expect(await buffer.listEnvsForOrg("org_1")).not.toContain("env_lc");
746749

747750
// accept → SADD
748751
await buffer.accept({ runId: "r1", envId: "env_lc", orgId: "org_1", payload: "{}" });
749-
expect(await buffer.listEnvs()).toContain("env_lc");
752+
expect(await buffer.listEnvsForOrg("org_1")).toContain("env_lc");
750753

751754
// second accept (different runId) → still SADD (idempotent)
752755
await buffer.accept({ runId: "r2", envId: "env_lc", orgId: "org_1", payload: "{}" });
753-
expect(await buffer.listEnvs()).toContain("env_lc");
756+
expect(await buffer.listEnvsForOrg("org_1")).toContain("env_lc");
754757

755758
// pop r1 → queue still has r2 → env stays
756759
await buffer.pop("env_lc");
757-
expect(await buffer.listEnvs()).toContain("env_lc");
760+
expect(await buffer.listEnvsForOrg("org_1")).toContain("env_lc");
758761

759762
// ack r1 → no queue change, env still tracked (r2 still queued)
760763
await buffer.ack("r1");
761-
expect(await buffer.listEnvs()).toContain("env_lc");
764+
expect(await buffer.listEnvsForOrg("org_1")).toContain("env_lc");
762765

763766
// pop r2 → queue empties → SREM
764767
await buffer.pop("env_lc");
765-
expect(await buffer.listEnvs()).not.toContain("env_lc");
768+
expect(await buffer.listEnvsForOrg("org_1")).not.toContain("env_lc");
766769

767770
// requeue r2 → SADD back
768771
await buffer.requeue("r2");
769-
expect(await buffer.listEnvs()).toContain("env_lc");
772+
expect(await buffer.listEnvsForOrg("org_1")).toContain("env_lc");
770773

771774
// fail r2 → entry FAILED but queue empty → next pop should SREM
772775
await buffer.pop("env_lc");
773776
await buffer.fail("r2", { code: "X", message: "boom" });
774-
const afterFailEnvs = await buffer.listEnvs();
777+
const afterFailEnvs = await buffer.listEnvsForOrg("org_1");
775778
// Queue is empty, env was SREM'd by the pop above.
776779
expect(afterFailEnvs).not.toContain("env_lc");
777780
} finally {
@@ -953,10 +956,10 @@ describe("MollifierBuffer envs set lifecycle", () => {
953956

954957
try {
955958
await buffer.accept({ runId: "r1", envId: "env_a", orgId: "org_1", payload: "{}" });
956-
expect(await buffer.listEnvs()).toContain("env_a");
959+
expect(await buffer.listEnvsForOrg("org_1")).toContain("env_a");
957960

958961
await buffer.pop("env_a");
959-
expect(await buffer.listEnvs()).not.toContain("env_a");
962+
expect(await buffer.listEnvsForOrg("org_1")).not.toContain("env_a");
960963
} finally {
961964
await buffer.close();
962965
}
@@ -980,42 +983,13 @@ describe("MollifierBuffer envs set lifecycle", () => {
980983
try {
981984
await buffer.accept({ runId: "r1", envId: "env_a", orgId: "org_1", payload: "{}" });
982985
await buffer.accept({ runId: "r2", envId: "env_a", orgId: "org_1", payload: "{}" });
983-
expect(await buffer.listEnvs()).toContain("env_a");
986+
expect(await buffer.listEnvsForOrg("org_1")).toContain("env_a");
984987

985988
await buffer.pop("env_a");
986-
expect(await buffer.listEnvs()).toContain("env_a");
989+
expect(await buffer.listEnvsForOrg("org_1")).toContain("env_a");
987990

988991
await buffer.pop("env_a");
989-
expect(await buffer.listEnvs()).not.toContain("env_a");
990-
} finally {
991-
await buffer.close();
992-
}
993-
},
994-
);
995-
996-
redisTest(
997-
"pop on an empty queue SREMs the envId opportunistically",
998-
{ timeout: 20_000 },
999-
async ({ redisContainer }) => {
1000-
const buffer = new MollifierBuffer({
1001-
redisOptions: {
1002-
host: redisContainer.getHost(),
1003-
port: redisContainer.getPort(),
1004-
password: redisContainer.getPassword(),
1005-
},
1006-
entryTtlSeconds: 600,
1007-
logger: new Logger("test", "log"),
1008-
});
1009-
1010-
try {
1011-
// Manually SADD an env without any queued entries (simulates leftover
1012-
// from a pre-fix run, or a manual touch). pop should clean it up.
1013-
await buffer["redis"].sadd("mollifier:envs", "env_orphan");
1014-
expect(await buffer.listEnvs()).toContain("env_orphan");
1015-
1016-
const popped = await buffer.pop("env_orphan");
1017-
expect(popped).toBeNull();
1018-
expect(await buffer.listEnvs()).not.toContain("env_orphan");
992+
expect(await buffer.listEnvsForOrg("org_1")).not.toContain("env_a");
1019993
} finally {
1020994
await buffer.close();
1021995
}
@@ -1040,11 +1014,11 @@ describe("MollifierBuffer envs set lifecycle", () => {
10401014
await buffer.accept({ runId: "r1", envId: "env_a", orgId: "org_1", payload: "{}" });
10411015
await buffer.pop("env_a");
10421016
// Queue drained → env_a SREM'd.
1043-
expect(await buffer.listEnvs()).not.toContain("env_a");
1017+
expect(await buffer.listEnvsForOrg("org_1")).not.toContain("env_a");
10441018

10451019
await buffer.requeue("r1");
10461020
// requeue must put env_a back so the drainer notices the retry.
1047-
expect(await buffer.listEnvs()).toContain("env_a");
1021+
expect(await buffer.listEnvsForOrg("org_1")).toContain("env_a");
10481022
} finally {
10491023
await buffer.close();
10501024
}

packages/redis-worker/src/mollifier/buffer.ts

Lines changed: 15 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,11 @@ export class MollifierBuffer {
5252
}): Promise<boolean> {
5353
const entryKey = `mollifier:entries:${input.runId}`;
5454
const queueKey = `mollifier:queue:${input.envId}`;
55-
const envsKey = "mollifier:envs";
5655
const orgsKey = "mollifier:orgs";
5756
const createdAt = new Date().toISOString();
5857
const result = await this.redis.acceptMollifierEntry(
5958
entryKey,
6059
queueKey,
61-
envsKey,
6260
orgsKey,
6361
input.runId,
6462
input.envId,
@@ -73,12 +71,10 @@ export class MollifierBuffer {
7371

7472
async pop(envId: string): Promise<BufferEntry | null> {
7573
const queueKey = `mollifier:queue:${envId}`;
76-
const envsKey = "mollifier:envs";
7774
const orgsKey = "mollifier:orgs";
7875
const entryPrefix = "mollifier:entries:";
7976
const encoded = (await this.redis.popAndMarkDraining(
8077
queueKey,
81-
envsKey,
8278
orgsKey,
8379
entryPrefix,
8480
envId,
@@ -120,16 +116,10 @@ export class MollifierBuffer {
120116
return parsed.data;
121117
}
122118

123-
// Flat list of envs with active entries. Kept for inspection and the
124-
// org-walk fallback; the drainer walks orgs → envs-for-org instead.
125-
async listEnvs(): Promise<string[]> {
126-
return this.redis.smembers("mollifier:envs");
127-
}
128-
129119
// Drainer walks these two methods to schedule pops with org-level
130120
// fairness: one env per org per tick. The Lua scripts maintain both
131-
// sets atomically with the per-env queues, so an env appears here
132-
// exactly when its queue has at least one entry.
121+
// sets atomically with the per-env queues, so an org/env appears here
122+
// exactly when at least one of its envs has a queued entry.
133123
async listOrgs(): Promise<string[]> {
134124
return this.redis.smembers("mollifier:orgs");
135125
}
@@ -145,7 +135,6 @@ export class MollifierBuffer {
145135
async requeue(runId: string): Promise<void> {
146136
await this.redis.requeueMollifierEntry(
147137
`mollifier:entries:${runId}`,
148-
"mollifier:envs",
149138
"mollifier:orgs",
150139
"mollifier:queue:",
151140
runId,
@@ -191,12 +180,11 @@ export class MollifierBuffer {
191180

192181
#registerCommands() {
193182
this.redis.defineCommand("acceptMollifierEntry", {
194-
numberOfKeys: 4,
183+
numberOfKeys: 3,
195184
lua: `
196185
local entryKey = KEYS[1]
197186
local queueKey = KEYS[2]
198-
local envsKey = KEYS[3]
199-
local orgsKey = KEYS[4]
187+
local orgsKey = KEYS[3]
200188
local runId = ARGV[1]
201189
local envId = ARGV[2]
202190
local orgId = ARGV[3]
@@ -222,9 +210,8 @@ export class MollifierBuffer {
222210
'createdAt', createdAt)
223211
redis.call('EXPIRE', entryKey, ttlSeconds)
224212
redis.call('LPUSH', queueKey, runId)
225-
redis.call('SADD', envsKey, envId)
226213
-- Org-level membership: maintained atomically with the per-env
227-
-- queue/SET so the drainer can walk orgs → envs-for-org and
214+
-- queue so the drainer can walk orgs → envs-for-org and
228215
-- schedule one env per org per tick. SADDs are idempotent if the
229216
-- org/env are already tracked.
230217
redis.call('SADD', orgsKey, orgId)
@@ -234,11 +221,10 @@ export class MollifierBuffer {
234221
});
235222

236223
this.redis.defineCommand("requeueMollifierEntry", {
237-
numberOfKeys: 3,
224+
numberOfKeys: 2,
238225
lua: `
239226
local entryKey = KEYS[1]
240-
local envsKey = KEYS[2]
241-
local orgsKey = KEYS[3]
227+
local orgsKey = KEYS[2]
242228
local queuePrefix = ARGV[1]
243229
local runId = ARGV[2]
244230
local orgEnvsPrefix = ARGV[3]
@@ -254,10 +240,9 @@ export class MollifierBuffer {
254240
255241
redis.call('HSET', entryKey, 'status', 'QUEUED', 'attempts', tostring(nextAttempts))
256242
redis.call('LPUSH', queuePrefix .. envId, runId)
257-
-- Re-track the env/org: pop may have SREM'd them when the queue
243+
-- Re-track the org/env: pop may have SREM'd them when the queue
258244
-- last emptied. SADDs are idempotent if the values are still
259245
-- present.
260-
redis.call('SADD', envsKey, envId)
261246
if orgId then
262247
redis.call('SADD', orgsKey, orgId)
263248
redis.call('SADD', orgEnvsPrefix .. orgId, envId)
@@ -267,11 +252,10 @@ export class MollifierBuffer {
267252
});
268253

269254
this.redis.defineCommand("popAndMarkDraining", {
270-
numberOfKeys: 3,
255+
numberOfKeys: 2,
271256
lua: `
272257
local queueKey = KEYS[1]
273-
local envsKey = KEYS[2]
274-
local orgsKey = KEYS[3]
258+
local orgsKey = KEYS[2]
275259
local entryPrefix = ARGV[1]
276260
local envId = ARGV[2]
277261
local orgEnvsPrefix = ARGV[3]
@@ -297,14 +281,9 @@ export class MollifierBuffer {
297281
while true do
298282
local runId = redis.call('RPOP', queueKey)
299283
if not runId then
300-
-- Queue is empty; opportunistically prune envs set. SREM is safe
301-
-- under concurrent LPUSH: accept SADDs the env back atomically.
302-
-- Org-level cleanup is skipped here because we don't know orgId
303-
-- without an entry to read from. Stale org-envs entries are
304-
-- bounded by env count and recovered on the next accept.
305-
if redis.call('LLEN', queueKey) == 0 then
306-
redis.call('SREM', envsKey, envId)
307-
end
284+
-- Queue is empty AND we have no entry to read orgId from, so
285+
-- skip org-level cleanup. Stale org-envs entries are bounded
286+
-- by env count and recovered on the next accept.
308287
return nil
309288
end
310289
@@ -316,11 +295,10 @@ export class MollifierBuffer {
316295
for i = 1, #raw, 2 do
317296
result[raw[i]] = raw[i + 1]
318297
end
319-
-- Prune envs/orgs/org-envs sets if this pop drained the queue.
298+
-- Prune org-level membership if this pop drained the queue.
320299
-- Atomic with the RPOP above — a concurrent accept AFTER this
321-
-- script will SADD all three back along with its LPUSH.
300+
-- script will SADD both back along with its LPUSH.
322301
if redis.call('LLEN', queueKey) == 0 then
323-
redis.call('SREM', envsKey, envId)
324302
pruneOrgMembership(result['orgId'])
325303
end
326304
return cjson.encode(result)
@@ -378,7 +356,6 @@ declare module "@internal/redis" {
378356
acceptMollifierEntry(
379357
entryKey: string,
380358
queueKey: string,
381-
envsKey: string,
382359
orgsKey: string,
383360
runId: string,
384361
envId: string,
@@ -391,7 +368,6 @@ declare module "@internal/redis" {
391368
): Result<number, Context>;
392369
popAndMarkDraining(
393370
queueKey: string,
394-
envsKey: string,
395371
orgsKey: string,
396372
entryPrefix: string,
397373
envId: string,
@@ -400,7 +376,6 @@ declare module "@internal/redis" {
400376
): Result<string | null, Context>;
401377
requeueMollifierEntry(
402378
entryKey: string,
403-
envsKey: string,
404379
orgsKey: string,
405380
queuePrefix: string,
406381
runId: string,

0 commit comments

Comments
 (0)