Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 94 additions & 16 deletions packages/core/src/relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,11 @@ export async function sendViaRelay(options: {
}

export const WORKER_SCRIPT = `
function getPlanConfig(env) {
const paid = (env.RELAY_PLAN || '').toLowerCase() === 'paid'
return { paid, allowWebSocket: paid, logRequests: paid }
}

async function hashBody(body) {
const bytes = new TextEncoder().encode(body)
const digest = await crypto.subtle.digest('SHA-256', bytes)
Expand Down Expand Up @@ -1032,7 +1037,7 @@ async function resolveBody(env, payload) {
return { error: 'unknown mode', status: 400 }
}

async function prepareUpstream(env, payload) {
async function prepareUpstream(env, payload, config) {
if ((payload.protocol !== 1 && payload.protocol !== 2) || payload.type !== 'request' || !payload.affinity || !payload.upstream?.url || !payload.next_hash) {
return { error: 'invalid payload', status: 400 }
}
Expand All @@ -1045,14 +1050,16 @@ async function prepareUpstream(env, payload) {
}

const stateWrite = writeState(env, payload.affinity, { body, hash: payload.next_hash, revision: payload.revision }).catch(() => {})
console.log(JSON.stringify({
relay: 'opencode-anthropic-auth',
transport: 'relay',
mode: payload.mode,
revision: payload.revision,
affinity: String(payload.affinity).slice(0, 12),
bodyBytes: body.length,
}))
if (config.logRequests) {
Comment thread
iceteaSA marked this conversation as resolved.
console.log(JSON.stringify({
relay: 'opencode-anthropic-auth',
transport: 'http',
mode: payload.mode,
revision: payload.revision,
affinity: String(payload.affinity).slice(0, 12),
bodyBytes: body.length,
}))
}

return { body, stateWrite }
}
Expand Down Expand Up @@ -1123,18 +1130,51 @@ function headersToObject(headers) {
return result
}

async function handleRelayPayload(env, payload) {
const prepared = await prepareUpstream(env, payload)
const SKIP_ERROR_LOG_STATUSES = new Set([429, 403])

async function logUpstreamError(env, ctx, upstream, meta) {
if (!upstream.status || upstream.status < 400 || SKIP_ERROR_LOG_STATUSES.has(upstream.status)) return
try {
const body = await upstream.clone().text()
const key = 'error:' + Date.now() + ':' + (meta.id || meta.affinity || 'unknown')
const entry = JSON.stringify({
ts: new Date().toISOString(),
status: upstream.status,
statusText: upstream.statusText,
transport: meta.transport,
mode: meta.mode,
affinity: meta.affinity,
id: meta.id,
bodyBytes: meta.bodyBytes,
responseBody: body.slice(0, 50000),
responseHeaders: headersToObject(upstream.headers),
})
const kvWrite = env.RELAY_STATE.put(key, entry, { expirationTtl: 604800 }).catch(() => {})
if (ctx?.waitUntil) ctx.waitUntil(kvWrite)
else void kvWrite
console.error(JSON.stringify({
relay: 'opencode-anthropic-auth',
event: 'upstream_error',
status: upstream.status,
transport: meta.transport,
affinity: String(meta.affinity || '').slice(0, 12),
responsePreview: body.slice(0, 500),
}))
} catch {}
}

async function handleRelayPayload(env, payload, config) {
const prepared = await prepareUpstream(env, payload, config)
if (prepared.error) return prepared
const upstream = await fetch(payload.upstream.url, {
method: payload.upstream.method || 'POST',
headers: payload.upstream.headers,
body: prepared.body,
})
return { upstream, stateWrite: prepared.stateWrite }
return { upstream, stateWrite: prepared.stateWrite, bodyBytes: prepared.body.length }
}

async function handleWebSocket(socket, env, ctx, payload, getState, setState) {
async function handleWebSocket(socket, env, ctx, payload, getState, setState, config) {
const heartbeat = setInterval(() => {
try {
socket.send(JSON.stringify({ protocol: 2, type: 'keepalive' }))
Expand All @@ -1151,6 +1191,16 @@ async function handleWebSocket(socket, env, ctx, payload, getState, setState) {

setState(result.state)
socket.send(JSON.stringify({ protocol: 2, type: 'accepted', id: payload.id, hash: result.state.hash, revision: result.state.revision }))
if (config.logRequests) {
console.log(JSON.stringify({
relay: 'opencode-anthropic-auth',
transport: 'websocket',
mode: payload.mode,
revision: payload.revision,
affinity: String(payload.affinity).slice(0, 12),
bodyBytes: result.body.length,
}))
}
ctx?.waitUntil?.(deferWorkerTask(result.logAccepted))

const upstreamPromise = fetch(payload.upstream.url, {
Expand All @@ -1160,6 +1210,19 @@ async function handleWebSocket(socket, env, ctx, payload, getState, setState) {
})
ctx?.waitUntil?.(result.checkpoint)
const upstream = await upstreamPromise
// Log non-429/403 errors to KV for debugging
if (upstream.status >= 400 && !SKIP_ERROR_LOG_STATUSES.has(upstream.status)) {
const errorClone = upstream.clone()
const errorLog = logUpstreamError(env, ctx, errorClone, {
transport: 'websocket',
mode: payload.mode,
affinity: payload.affinity,
id: payload.id,
bodyBytes: result.body.length,
})
if (ctx?.waitUntil) ctx.waitUntil(errorLog)
else void errorLog
}
socket.send(JSON.stringify({
protocol: 2,
type: 'response_start',
Expand Down Expand Up @@ -1187,7 +1250,12 @@ async function handleWebSocket(socket, env, ctx, payload, getState, setState) {

export default {
async fetch(request, env, ctx) {
const config = getPlanConfig(env)

if (request.headers.get('Upgrade') === 'websocket') {
if (!config.allowWebSocket) {
return new Response('WebSocket transport requires Workers Paid plan. Use HTTP transport or upgrade your plan.', { status: 403 })
}
const url = new URL(request.url)
const token = url.searchParams.get('token')
const affinity = url.searchParams.get('affinity')
Expand Down Expand Up @@ -1233,15 +1301,19 @@ export default {
}
payload.affinity = affinity
busy = true
const run = handleWebSocket(server, env, ctx, payload, () => state, (nextState) => { state = nextState }).finally(() => { busy = false })
const run = handleWebSocket(server, env, ctx, payload, () => state, (nextState) => { state = nextState }, config).finally(() => { busy = false })
ctx?.waitUntil?.(run)
if (!ctx?.waitUntil) void run
})
return new Response(null, { status: 101, webSocket: client })
}

if (request.method === 'GET') {
return Response.json({ status: 'ok', transports: ['http', 'websocket'] })
return Response.json({
status: 'ok',
plan: config.paid ? 'paid' : 'free',
transports: config.allowWebSocket ? ['http', 'websocket'] : ['http'],
})
}
if (request.method !== 'POST') return new Response('method not allowed', { status: 405 })
if (request.headers.get('x-relay-token') !== env.RELAY_TOKEN) {
Expand All @@ -1250,12 +1322,18 @@ export default {

try {
const payload = await request.json()
const result = await handleRelayPayload(env, payload)
const result = await handleRelayPayload(env, payload, config)
if (result.error) return Response.json({ error: result.error }, { status: result.status })

if (result.stateWrite) ctx.waitUntil(result.stateWrite)

const upstream = result.upstream
await logUpstreamError(env, ctx, upstream, {
transport: 'http',
mode: payload.mode,
affinity: payload.affinity,
bodyBytes: result.bodyBytes,
})
return new Response(upstream.body, {
status: upstream.status,
statusText: upstream.statusText,
Expand Down
2 changes: 1 addition & 1 deletion packages/opencode/src/tests/relay-worker-miniflare.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async function startWorker() {
modules: true,
compatibilityDate: '2026-04-28',
kvNamespaces: ['RELAY_STATE'],
bindings: { RELAY_TOKEN },
bindings: { RELAY_TOKEN, RELAY_PLAN: 'paid' },
port: 0,
log: new NoOpLog(),
})
Expand Down
Loading