Skip to content

Commit 950b2b4

Browse files
authored
Instant-admit free sessions when below per-model capacity (#530)
1 parent ad8bd4f commit 950b2b4

5 files changed

Lines changed: 210 additions & 0 deletions

File tree

web/src/app/api/v1/freebuff/session/__tests__/session.test.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ function makeSessionDeps(overrides: Partial<SessionDeps> = {}): SessionDeps & {
3737
rows,
3838
isWaitingRoomEnabled: () => true,
3939
graceMs: 30 * 60 * 1000,
40+
sessionLengthMs: 60 * 60 * 1000,
41+
// Keep instant-admit disabled in handler tests — they verify queue/state
42+
// transitions, not admission policy. With capacity 0 the deps below
43+
// aren't reached, so they're trivial stubs.
44+
getInstantAdmitCapacity: () => 0,
45+
activeCountForModel: async () => 0,
46+
promoteQueuedUser: async () => null,
4047
now: () => now,
4148
getSessionRow: async (userId) => rows.get(userId) ?? null,
4249
queueDepthsByModel: async () => {

web/src/server/free-session/__tests__/public-api.test.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,27 @@ function makeDeps(overrides: Partial<SessionDeps> = {}): SessionDeps & {
3838
_now: () => currentNow,
3939
isWaitingRoomEnabled: () => true,
4040
graceMs: GRACE_MS,
41+
sessionLengthMs: SESSION_LEN,
42+
// Test default: instant-admit disabled (capacity 0) so existing FIFO
43+
// queue tests stay green. Tests that exercise instant admission opt in
44+
// via `getInstantAdmitCapacity: () => N`.
45+
getInstantAdmitCapacity: () => 0,
46+
activeCountForModel: async (model) => {
47+
let n = 0
48+
for (const r of rows.values()) {
49+
if (r.status === 'active' && r.model === model) n++
50+
}
51+
return n
52+
},
53+
promoteQueuedUser: async ({ userId, model, sessionLengthMs, now }) => {
54+
const row = rows.get(userId)
55+
if (!row || row.status !== 'queued' || row.model !== model) return null
56+
row.status = 'active'
57+
row.admitted_at = now
58+
row.expires_at = new Date(now.getTime() + sessionLengthMs)
59+
row.updated_at = now
60+
return row
61+
},
4162
now: () => currentNow,
4263
getSessionRow: async (userId) => rows.get(userId) ?? null,
4364
endSession: async (userId) => {
@@ -192,6 +213,63 @@ describe('requestSession', () => {
192213
if (second.status !== 'active') throw new Error('unreachable')
193214
expect(second.instanceId).not.toBe('inst-1') // rotated
194215
})
216+
217+
test('instant-admit: below capacity admits the user in the same request', async () => {
218+
const admitDeps = makeDeps({ getInstantAdmitCapacity: () => 3 })
219+
const state = await requestSession({
220+
userId: 'u1',
221+
model: DEFAULT_MODEL,
222+
deps: admitDeps,
223+
})
224+
expect(state.status).toBe('active')
225+
if (state.status !== 'active') throw new Error('unreachable')
226+
expect(state.remainingMs).toBe(SESSION_LEN)
227+
// The row in storage is flipped too, so the next GET /session also sees active.
228+
expect(admitDeps.rows.get('u1')?.status).toBe('active')
229+
})
230+
231+
test('instant-admit: queues once active-count reaches capacity', async () => {
232+
const admitDeps = makeDeps({ getInstantAdmitCapacity: () => 2 })
233+
const s1 = await requestSession({
234+
userId: 'u1',
235+
model: DEFAULT_MODEL,
236+
deps: admitDeps,
237+
})
238+
const s2 = await requestSession({
239+
userId: 'u2',
240+
model: DEFAULT_MODEL,
241+
deps: admitDeps,
242+
})
243+
const s3 = await requestSession({
244+
userId: 'u3',
245+
model: DEFAULT_MODEL,
246+
deps: admitDeps,
247+
})
248+
expect(s1.status).toBe('active')
249+
expect(s2.status).toBe('active')
250+
expect(s3.status).toBe('queued')
251+
})
252+
253+
test('instant-admit: per-model capacities are independent', async () => {
254+
// GLM saturated at 1 active, MiniMax still has room.
255+
const admitDeps = makeDeps({
256+
getInstantAdmitCapacity: (model) =>
257+
model === DEFAULT_MODEL ? 1 : 10,
258+
})
259+
await requestSession({ userId: 'u1', model: DEFAULT_MODEL, deps: admitDeps })
260+
const s2 = await requestSession({
261+
userId: 'u2',
262+
model: DEFAULT_MODEL,
263+
deps: admitDeps,
264+
})
265+
const s3 = await requestSession({
266+
userId: 'u3',
267+
model: 'minimax/minimax-m2.7',
268+
deps: admitDeps,
269+
})
270+
expect(s2.status).toBe('queued')
271+
expect(s3.status).toBe('active')
272+
})
195273
})
196274

197275
describe('getSessionState', () => {

web/src/server/free-session/config.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,19 @@ export function getSessionLengthMs(): number {
3939
export function getSessionGraceMs(): number {
4040
return env.FREEBUFF_SESSION_GRACE_MS
4141
}
42+
43+
/**
44+
* Per-model instant-admit capacity: how many concurrent active sessions a
45+
* deployment can hold before new joiners fall back to the FIFO queue + tick.
46+
* Deployment-sizing knob — kept server-side so we can tune without bumping
47+
* the shared `common` package that the CLI consumes. Unknown ids → 0 (always
48+
* queue).
49+
*/
50+
const INSTANT_ADMIT_CAPACITY: Record<string, number> = {
51+
'z-ai/glm-5.1': 50,
52+
'minimax/minimax-m2.7': 200,
53+
}
54+
55+
export function getInstantAdmitCapacity(id: string): number {
56+
return INSTANT_ADMIT_CAPACITY[id] ?? 0
57+
}

web/src/server/free-session/public-api.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,19 @@ import {
44
} from '@codebuff/common/constants/freebuff-models'
55

66
import {
7+
getInstantAdmitCapacity,
78
getSessionGraceMs,
9+
getSessionLengthMs,
810
isWaitingRoomBypassedForEmail,
911
isWaitingRoomEnabled,
1012
} from './config'
1113
import {
14+
activeCountForModel,
1215
endSession,
1316
FreeSessionModelLockedError,
1417
getSessionRow,
1518
joinOrTakeOver,
19+
promoteQueuedUser,
1620
queueDepthsByModel,
1721
queuePositionFor,
1822
} from './store'
@@ -35,11 +39,28 @@ export interface SessionDeps {
3539
model: string
3640
queuedAt: Date
3741
}) => Promise<number>
42+
/** Instant-admit check: returns the number of active sessions currently
43+
* bound to a given model. Compared against the model's configured
44+
* `instantAdmitCapacity` to decide whether a new joiner skips the queue. */
45+
activeCountForModel: (model: string) => Promise<number>
46+
/** Instant-admit promotion: flips a specific queued row to active. Returns
47+
* the updated row or null if the row wasn't in a queued state. */
48+
promoteQueuedUser: (params: {
49+
userId: string
50+
model: string
51+
sessionLengthMs: number
52+
now: Date
53+
}) => Promise<InternalSessionRow | null>
54+
/** Per-model capacity lookup. Indirected through deps so tests can
55+
* force-enable / force-disable instant admit without mutating the
56+
* shared model registry. */
57+
getInstantAdmitCapacity: (model: string) => number
3858
isWaitingRoomEnabled: () => boolean
3959
/** Plain values, not getters: these never change at runtime. The deps
4060
* interface uses values rather than thunks so tests can pass numbers
4161
* inline without wrapping. */
4262
graceMs: number
63+
sessionLengthMs: number
4364
now?: () => Date
4465
}
4566

@@ -49,13 +70,19 @@ const defaultDeps: SessionDeps = {
4970
endSession,
5071
queueDepthsByModel,
5172
queuePositionFor,
73+
activeCountForModel,
74+
promoteQueuedUser,
75+
getInstantAdmitCapacity,
5276
isWaitingRoomEnabled,
5377
get graceMs() {
5478
// Read-through getter so test overrides via env still work; the value
5579
// itself is materialized once per call. Cheaper than a thunk because
5680
// callers don't have to invoke a function.
5781
return getSessionGraceMs()
5882
},
83+
get sessionLengthMs() {
84+
return getSessionLengthMs()
85+
},
5986
}
6087

6188
const nowOf = (deps: SessionDeps): Date => (deps.now ?? (() => new Date()))()
@@ -145,6 +172,33 @@ export async function requestSession(params: {
145172
}
146173
throw err
147174
}
175+
176+
// Instant-admit: if the model has spare capacity (fewer active sessions
177+
// than its configured `instantAdmitCapacity`), skip the waiting room
178+
// entirely and flip the user to active in this same request. The tick
179+
// + FIFO queue only engage once we hit the threshold, so backpressure
180+
// kicks in exactly when the deployment needs it.
181+
//
182+
// Race note: two concurrent joiners may each see `active < capacity`
183+
// and both get admitted, overshooting the cap by up to `concurrency - 1`.
184+
// Capacities are chosen with headroom for this, and the configured
185+
// value is a comfort threshold not a hard ceiling.
186+
if (row.status === 'queued') {
187+
const capacity = deps.getInstantAdmitCapacity(model)
188+
if (capacity > 0) {
189+
const activeCount = await deps.activeCountForModel(model)
190+
if (activeCount < capacity) {
191+
const promoted = await deps.promoteQueuedUser({
192+
userId: params.userId,
193+
model,
194+
sessionLengthMs: deps.sessionLengthMs,
195+
now: nowOf(deps),
196+
})
197+
if (promoted) row = promoted
198+
}
199+
}
200+
}
201+
148202
const view = await viewForRow(params.userId, deps, row)
149203
if (!view) {
150204
throw new Error(

web/src/server/free-session/store.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,24 @@ export async function queueDepthsByModel(): Promise<Record<string, number>> {
176176
return out
177177
}
178178

179+
/**
180+
* Count of rows currently in `active` status for one model — the threshold
181+
* check that gates instant admission. Hot-path lookup; callers avoid the
182+
* full `activeCountsByModel` scan when they only need one model's count.
183+
*/
184+
export async function activeCountForModel(model: string): Promise<number> {
185+
const rows = await db
186+
.select({ n: count() })
187+
.from(schema.freeSession)
188+
.where(
189+
and(
190+
eq(schema.freeSession.status, 'active'),
191+
eq(schema.freeSession.model, model),
192+
),
193+
)
194+
return Number(rows[0]?.n ?? 0)
195+
}
196+
179197
/**
180198
* Single-query read of active-row counts bucketed by model. Mirrors
181199
* `queueDepthsByModel` so the admission tick can log per-model utilization
@@ -333,6 +351,43 @@ export async function admitFromQueue(params: {
333351
})
334352
}
335353

354+
/**
355+
* Promote a specific queued user to active. Used by the instant-admit path
356+
* in `requestSession` when the model's active-session count is below its
357+
* configured capacity — skips the FIFO advisory-lock dance because each
358+
* call targets a distinct (user_id, model) and the UPDATE is a no-op if
359+
* the row isn't queued any more.
360+
*
361+
* Returns the updated row or null if the row was not in the expected
362+
* (queued, same-model) state.
363+
*/
364+
export async function promoteQueuedUser(params: {
365+
userId: string
366+
model: string
367+
sessionLengthMs: number
368+
now: Date
369+
}): Promise<InternalSessionRow | null> {
370+
const { userId, model, sessionLengthMs, now } = params
371+
const expiresAt = new Date(now.getTime() + sessionLengthMs)
372+
const [row] = await db
373+
.update(schema.freeSession)
374+
.set({
375+
status: 'active',
376+
admitted_at: now,
377+
expires_at: expiresAt,
378+
updated_at: now,
379+
})
380+
.where(
381+
and(
382+
eq(schema.freeSession.user_id, userId),
383+
eq(schema.freeSession.status, 'queued'),
384+
eq(schema.freeSession.model, model),
385+
),
386+
)
387+
.returning()
388+
return (row as InternalSessionRow | undefined) ?? null
389+
}
390+
336391
/** Stable 31-bit hash so model-keyed advisory lock ids don't overflow int4. */
337392
function hashStringToInt32(s: string): number {
338393
let h = 0

0 commit comments

Comments
 (0)