From 5e33a830d11a759699835652a1fcef4f3b3d060c Mon Sep 17 00:00:00 2001 From: iceteaSA <171169159+iceteaSA@users.noreply.github.com> Date: Thu, 21 May 2026 21:05:45 +0200 Subject: [PATCH] feat(relay): add plan-based gating, KV error logging, and request logging Worker script improvements: - Plan-based gating: WebSocket transport requires paid plan (RELAY_PLAN=paid) - KV error logging: non-429/403 upstream errors logged to KV with 7-day TTL - Request logging: HTTP and WebSocket requests logged on paid plan - GET health endpoint returns plan info and available transports --- packages/core/src/relay.ts | 110 +++++++++++++++--- .../src/tests/relay-worker-miniflare.test.ts | 2 +- 2 files changed, 95 insertions(+), 17 deletions(-) diff --git a/packages/core/src/relay.ts b/packages/core/src/relay.ts index 1eaef7b..fac3b45 100644 --- a/packages/core/src/relay.ts +++ b/packages/core/src/relay.ts @@ -985,6 +985,11 @@ export async function sendViaRelay(options: { } export const WORKER_SCRIPT = ` +function getPlanConfig(env) { + const paid = (env.RELAY_PLAN || '').toLowerCase() === 'paid' + return { paid, allowWebSocket: paid, logRequests: paid } +} + async function hashBody(body) { const bytes = new TextEncoder().encode(body) const digest = await crypto.subtle.digest('SHA-256', bytes) @@ -1032,7 +1037,7 @@ async function resolveBody(env, payload) { return { error: 'unknown mode', status: 400 } } -async function prepareUpstream(env, payload) { +async function prepareUpstream(env, payload, config) { if ((payload.protocol !== 1 && payload.protocol !== 2) || payload.type !== 'request' || !payload.affinity || !payload.upstream?.url || !payload.next_hash) { return { error: 'invalid payload', status: 400 } } @@ -1045,14 +1050,16 @@ async function prepareUpstream(env, payload) { } const stateWrite = writeState(env, payload.affinity, { body, hash: payload.next_hash, revision: payload.revision }).catch(() => {}) - console.log(JSON.stringify({ - relay: 'opencode-anthropic-auth', - transport: 'relay', - mode: payload.mode, - revision: payload.revision, - affinity: String(payload.affinity).slice(0, 12), - bodyBytes: body.length, - })) + if (config.logRequests) { + console.log(JSON.stringify({ + relay: 'opencode-anthropic-auth', + transport: 'http', + mode: payload.mode, + revision: payload.revision, + affinity: String(payload.affinity).slice(0, 12), + bodyBytes: body.length, + })) + } return { body, stateWrite } } @@ -1123,18 +1130,51 @@ function headersToObject(headers) { return result } -async function handleRelayPayload(env, payload) { - const prepared = await prepareUpstream(env, payload) +const SKIP_ERROR_LOG_STATUSES = new Set([429, 403]) + +async function logUpstreamError(env, ctx, upstream, meta) { + if (!upstream.status || upstream.status < 400 || SKIP_ERROR_LOG_STATUSES.has(upstream.status)) return + try { + const body = await upstream.clone().text() + const key = 'error:' + Date.now() + ':' + (meta.id || meta.affinity || 'unknown') + const entry = JSON.stringify({ + ts: new Date().toISOString(), + status: upstream.status, + statusText: upstream.statusText, + transport: meta.transport, + mode: meta.mode, + affinity: meta.affinity, + id: meta.id, + bodyBytes: meta.bodyBytes, + responseBody: body.slice(0, 50000), + responseHeaders: headersToObject(upstream.headers), + }) + const kvWrite = env.RELAY_STATE.put(key, entry, { expirationTtl: 604800 }).catch(() => {}) + if (ctx?.waitUntil) ctx.waitUntil(kvWrite) + else void kvWrite + console.error(JSON.stringify({ + relay: 'opencode-anthropic-auth', + event: 'upstream_error', + status: upstream.status, + transport: meta.transport, + affinity: String(meta.affinity || '').slice(0, 12), + responsePreview: body.slice(0, 500), + })) + } catch {} +} + +async function handleRelayPayload(env, payload, config) { + const prepared = await prepareUpstream(env, payload, config) if (prepared.error) return prepared const upstream = await fetch(payload.upstream.url, { method: payload.upstream.method || 'POST', headers: payload.upstream.headers, body: prepared.body, }) - return { upstream, stateWrite: prepared.stateWrite } + return { upstream, stateWrite: prepared.stateWrite, bodyBytes: prepared.body.length } } -async function handleWebSocket(socket, env, ctx, payload, getState, setState) { +async function handleWebSocket(socket, env, ctx, payload, getState, setState, config) { const heartbeat = setInterval(() => { try { socket.send(JSON.stringify({ protocol: 2, type: 'keepalive' })) @@ -1151,6 +1191,16 @@ async function handleWebSocket(socket, env, ctx, payload, getState, setState) { setState(result.state) socket.send(JSON.stringify({ protocol: 2, type: 'accepted', id: payload.id, hash: result.state.hash, revision: result.state.revision })) + if (config.logRequests) { + console.log(JSON.stringify({ + relay: 'opencode-anthropic-auth', + transport: 'websocket', + mode: payload.mode, + revision: payload.revision, + affinity: String(payload.affinity).slice(0, 12), + bodyBytes: result.body.length, + })) + } ctx?.waitUntil?.(deferWorkerTask(result.logAccepted)) const upstreamPromise = fetch(payload.upstream.url, { @@ -1160,6 +1210,19 @@ async function handleWebSocket(socket, env, ctx, payload, getState, setState) { }) ctx?.waitUntil?.(result.checkpoint) const upstream = await upstreamPromise + // Log non-429/403 errors to KV for debugging + if (upstream.status >= 400 && !SKIP_ERROR_LOG_STATUSES.has(upstream.status)) { + const errorClone = upstream.clone() + const errorLog = logUpstreamError(env, ctx, errorClone, { + transport: 'websocket', + mode: payload.mode, + affinity: payload.affinity, + id: payload.id, + bodyBytes: result.body.length, + }) + if (ctx?.waitUntil) ctx.waitUntil(errorLog) + else void errorLog + } socket.send(JSON.stringify({ protocol: 2, type: 'response_start', @@ -1187,7 +1250,12 @@ async function handleWebSocket(socket, env, ctx, payload, getState, setState) { export default { async fetch(request, env, ctx) { + const config = getPlanConfig(env) + if (request.headers.get('Upgrade') === 'websocket') { + if (!config.allowWebSocket) { + return new Response('WebSocket transport requires Workers Paid plan. Use HTTP transport or upgrade your plan.', { status: 403 }) + } const url = new URL(request.url) const token = url.searchParams.get('token') const affinity = url.searchParams.get('affinity') @@ -1233,7 +1301,7 @@ export default { } payload.affinity = affinity busy = true - const run = handleWebSocket(server, env, ctx, payload, () => state, (nextState) => { state = nextState }).finally(() => { busy = false }) + const run = handleWebSocket(server, env, ctx, payload, () => state, (nextState) => { state = nextState }, config).finally(() => { busy = false }) ctx?.waitUntil?.(run) if (!ctx?.waitUntil) void run }) @@ -1241,7 +1309,11 @@ export default { } if (request.method === 'GET') { - return Response.json({ status: 'ok', transports: ['http', 'websocket'] }) + return Response.json({ + status: 'ok', + plan: config.paid ? 'paid' : 'free', + transports: config.allowWebSocket ? ['http', 'websocket'] : ['http'], + }) } if (request.method !== 'POST') return new Response('method not allowed', { status: 405 }) if (request.headers.get('x-relay-token') !== env.RELAY_TOKEN) { @@ -1250,12 +1322,18 @@ export default { try { const payload = await request.json() - const result = await handleRelayPayload(env, payload) + const result = await handleRelayPayload(env, payload, config) if (result.error) return Response.json({ error: result.error }, { status: result.status }) if (result.stateWrite) ctx.waitUntil(result.stateWrite) const upstream = result.upstream + await logUpstreamError(env, ctx, upstream, { + transport: 'http', + mode: payload.mode, + affinity: payload.affinity, + bodyBytes: result.bodyBytes, + }) return new Response(upstream.body, { status: upstream.status, statusText: upstream.statusText, diff --git a/packages/opencode/src/tests/relay-worker-miniflare.test.ts b/packages/opencode/src/tests/relay-worker-miniflare.test.ts index aac32c3..325a736 100644 --- a/packages/opencode/src/tests/relay-worker-miniflare.test.ts +++ b/packages/opencode/src/tests/relay-worker-miniflare.test.ts @@ -48,7 +48,7 @@ async function startWorker() { modules: true, compatibilityDate: '2026-04-28', kvNamespaces: ['RELAY_STATE'], - bindings: { RELAY_TOKEN }, + bindings: { RELAY_TOKEN, RELAY_PLAN: 'paid' }, port: 0, log: new NoOpLog(), })