Skip to content

Commit fce31b8

Browse files
committed
ensure billed
1 parent 84aca63 commit fce31b8

File tree

1 file changed

+221
-2
lines changed

1 file changed

+221
-2
lines changed

web/src/llm-api/openrouter.ts

Lines changed: 221 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,22 @@ import type {
2323
OpenRouterErrorMetadata,
2424
} from './types'
2525

26-
type StreamState = { responseText: string; reasoningText: string; ttftMs: number | null }
26+
type StreamState = {
27+
responseText: string
28+
reasoningText: string
29+
ttftMs: number | null
30+
// Captured from the first regular chunk we see. Needed to bill via the
31+
// generation-lookup fallback when a stream ends without a usage-bearing chunk
32+
// (e.g., upstream error chunk, truncated response, network drop).
33+
generationId: string | null
34+
model: string | null
35+
billed: boolean
36+
}
37+
38+
// How long to wait after stream close before querying OpenRouter's generation
39+
// endpoint. OR finalizes generation records asynchronously; 500ms is enough
40+
// in practice and keeps the delay off the client response path.
41+
const GENERATION_LOOKUP_DELAY_MS = 500
2742

2843
// Extended timeout for deep-thinking models (e.g., gpt-5) that can take
2944
// a long time to start streaming.
@@ -334,9 +349,45 @@ export async function handleOpenRouterStream({
334349
}
335350

336351
let heartbeatInterval: NodeJS.Timeout
337-
let state: StreamState = { responseText: '', reasoningText: '', ttftMs: null }
352+
let state: StreamState = {
353+
responseText: '',
354+
reasoningText: '',
355+
ttftMs: null,
356+
generationId: null,
357+
model: null,
358+
billed: false,
359+
}
338360
let clientDisconnected = false
339361

362+
// Runs once on any stream-exit path. If we didn't bill through the normal
363+
// path (stream ended without a usage chunk, got a provider error chunk,
364+
// network drop), ask OpenRouter for the generation's final cost so we still
365+
// capture what we were charged. Without this, a well-timed mid-stream failure
366+
// lets the caller walk away with free completion tokens.
367+
const ensureBilled = async () => {
368+
if (state.billed || !state.generationId) return
369+
await new Promise((resolve) =>
370+
setTimeout(resolve, GENERATION_LOOKUP_DELAY_MS),
371+
)
372+
await fallbackBillFromGeneration({
373+
generationId: state.generationId,
374+
openrouterApiKey,
375+
userId,
376+
stripeCustomerId,
377+
agentId,
378+
clientId,
379+
clientRequestId,
380+
costMode,
381+
byok,
382+
startTime,
383+
state,
384+
request: body,
385+
fetch,
386+
logger,
387+
insertMessage: insertMessageBigquery,
388+
})
389+
}
390+
340391
// Create a ReadableStream that Next.js can handle
341392
const stream = new ReadableStream({
342393
async start(controller) {
@@ -420,6 +471,7 @@ export async function handleOpenRouterStream({
420471
if (!clientDisconnected) {
421472
controller.close()
422473
}
474+
await ensureBilled()
423475
} catch (error) {
424476
if (!clientDisconnected) {
425477
controller.error(error)
@@ -429,6 +481,7 @@ export async function handleOpenRouterStream({
429481
'Error after client disconnect in OpenRouter stream',
430482
)
431483
}
484+
await ensureBilled()
432485
} finally {
433486
clearInterval(heartbeatInterval)
434487
}
@@ -609,6 +662,7 @@ async function handleResponse({
609662
ttftMs: state.ttftMs,
610663
})
611664

665+
state.billed = true
612666
return { state, billedCredits }
613667
}
614668

@@ -633,6 +687,17 @@ async function handleStreamChunk({
633687
// still storing enough data for logging and billing. 1MB is a generous limit.
634688
const MAX_BUFFER_SIZE = 1 * 1024 * 1024 // 1MB
635689

690+
// Capture generation id and model from any regular chunk so we can still
691+
// bill via the generation-lookup fallback if the stream never emits usage.
692+
if (!('error' in data)) {
693+
if (data.id && !state.generationId) {
694+
state.generationId = data.id
695+
}
696+
if (data.model && !state.model) {
697+
state.model = data.model
698+
}
699+
}
700+
636701
if ('error' in data) {
637702
// Log detailed error information for stream errors (e.g., Forbidden from Anthropic)
638703
const errorData = data.error as {
@@ -819,6 +884,160 @@ function creditsToFakeCost(credits: number): number {
819884
return credits / ((1 + PROFIT_MARGIN) * 100)
820885
}
821886

887+
/**
888+
* Bill a stream that exited before a usage-bearing chunk arrived by looking up
889+
* the generation cost from OpenRouter's /generation endpoint. Mutates
890+
* `state.billed` on success so callers can tell the gap was filled.
891+
*
892+
* Never throws — failures are logged and swallowed. The worst case is that we
893+
* miss this one request, which is still strictly better than the old behavior.
894+
*/
895+
async function fallbackBillFromGeneration(params: {
896+
generationId: string
897+
openrouterApiKey: string | null
898+
userId: string
899+
stripeCustomerId?: string | null
900+
agentId: string
901+
clientId: string | null
902+
clientRequestId: string | null
903+
costMode: string | undefined
904+
byok: boolean
905+
startTime: Date
906+
state: StreamState
907+
request: unknown
908+
fetch: typeof globalThis.fetch
909+
logger: Logger
910+
insertMessage: InsertMessageBigqueryFn
911+
}): Promise<void> {
912+
const {
913+
generationId,
914+
openrouterApiKey,
915+
userId,
916+
stripeCustomerId,
917+
agentId,
918+
clientId,
919+
clientRequestId,
920+
costMode,
921+
byok,
922+
startTime,
923+
state,
924+
request,
925+
fetch,
926+
logger,
927+
insertMessage,
928+
} = params
929+
930+
try {
931+
const response = await fetch(
932+
`https://openrouter.ai/api/v1/generation?id=${encodeURIComponent(generationId)}`,
933+
{
934+
method: 'GET',
935+
headers: {
936+
Authorization: `Bearer ${openrouterApiKey ?? env.OPEN_ROUTER_API_KEY}`,
937+
},
938+
},
939+
)
940+
941+
if (!response.ok) {
942+
logger.error(
943+
{
944+
generationId,
945+
status: response.status,
946+
statusText: response.statusText,
947+
userId,
948+
agentId,
949+
model: state.model,
950+
responseTextLength: state.responseText.length,
951+
},
952+
'fallbackBillFromGeneration: generation lookup failed',
953+
)
954+
return
955+
}
956+
957+
const body = (await response.json()) as { data?: Record<string, unknown> }
958+
const data = body?.data
959+
if (!data) {
960+
logger.warn(
961+
{ generationId, userId, agentId },
962+
'fallbackBillFromGeneration: generation lookup returned no data',
963+
)
964+
return
965+
}
966+
967+
const num = (v: unknown) => (typeof v === 'number' ? v : 0)
968+
const usageData: UsageData = {
969+
inputTokens: num(data.tokens_prompt) || num(data.native_tokens_prompt),
970+
outputTokens:
971+
num(data.tokens_completion) || num(data.native_tokens_completion),
972+
cacheReadInputTokens: num(data.native_tokens_cached),
973+
reasoningTokens: num(data.native_tokens_reasoning),
974+
cost: num(data.total_cost),
975+
}
976+
const resolvedModel =
977+
state.model ?? (typeof data.model === 'string' ? data.model : '')
978+
979+
logger.warn(
980+
{
981+
generationId,
982+
userId,
983+
agentId,
984+
model: resolvedModel,
985+
cost: usageData.cost,
986+
inputTokens: usageData.inputTokens,
987+
outputTokens: usageData.outputTokens,
988+
responseTextLength: state.responseText.length,
989+
},
990+
'fallbackBillFromGeneration: billing from generation lookup (stream exited without usage chunk)',
991+
)
992+
993+
insertMessageToBigQuery({
994+
messageId: generationId,
995+
userId,
996+
startTime,
997+
request,
998+
reasoningText: state.reasoningText,
999+
responseText: state.responseText,
1000+
usageData,
1001+
logger,
1002+
insertMessageBigquery: insertMessage,
1003+
}).catch((error) => {
1004+
logger.error(
1005+
{ error: getErrorObject(error), generationId },
1006+
'fallbackBillFromGeneration: BigQuery insert failed',
1007+
)
1008+
})
1009+
1010+
await consumeCreditsForMessage({
1011+
messageId: generationId,
1012+
userId,
1013+
stripeCustomerId,
1014+
agentId,
1015+
clientId,
1016+
clientRequestId,
1017+
startTime,
1018+
model: resolvedModel,
1019+
reasoningText: state.reasoningText,
1020+
responseText: state.responseText,
1021+
usageData,
1022+
byok,
1023+
logger,
1024+
costMode,
1025+
ttftMs: state.ttftMs,
1026+
})
1027+
state.billed = true
1028+
} catch (error) {
1029+
logger.error(
1030+
{
1031+
error: getErrorObject(error),
1032+
generationId,
1033+
userId,
1034+
agentId,
1035+
},
1036+
'fallbackBillFromGeneration threw',
1037+
)
1038+
}
1039+
}
1040+
8221041
/**
8231042
* Overwrite the cost field in the final SSE chunk to reflect actual billed credits.
8241043
* This ensures the SDK calculates the exact credits value we stored in the database,

0 commit comments

Comments
 (0)