diff --git a/server.js b/server.js index 442feef..fc0baa0 100755 --- a/server.js +++ b/server.js @@ -66,6 +66,12 @@ let DS_CONFIG = {}; let dsHeaders = {}; const accounts = []; let accountRoundRobin = 0; +let inFlight = 0; // concurrent in-flight completions (backpressure cap) +// Overall wall-clock budget for one inbound request (caps the retry/continuation +// loops), max concurrent completions, and the empty-response retry cap. +const REQUEST_DEADLINE_MS = Number(process.env.DEEPSEEK_REQUEST_DEADLINE_MS || 120000); +const MAX_CONCURRENT = Number(process.env.DEEPSEEK_MAX_CONCURRENT || 24); +const MAX_EMPTY_RETRIES = Number(process.env.DEEPSEEK_MAX_RETRIES || 4); function buildBaseHeaders(config = DS_CONFIG) { return { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/149.0.0.0 Safari/537.36", @@ -157,9 +163,15 @@ function selectAccountForSession(session) { const waiting = accounts.filter(a => a.config.token && a.config.cookie).sort((a, b) => a.cooldownUntil - b.cooldownUntil)[0]; if (waiting) { const waitSec = Math.max(1, Math.ceil((waiting.cooldownUntil - now) / 1000)); - throw new Error(`All DeepSeek auth accounts are cooling down. Retry in ~${waitSec}s or import a fresh account with npm run auth:import.`); + // Tagged so the request handler returns 429 + Retry-After instead of a + // generic 500 (integrator backoff keys on the status code, not the text). + const err = new Error(`All DeepSeek auth accounts are cooling down. Retry in ~${waitSec}s or import a fresh account with npm run auth:import.`); + err.status = 429; err.retryAfter = waitSec; err.type = 'rate_limit'; + throw err; } - throw new Error('No valid DeepSeek auth accounts. Run npm run auth or npm run auth:import.'); + const noAuth = new Error('No valid DeepSeek auth accounts. Run npm run auth or npm run auth:import.'); + noAuth.status = 503; noAuth.type = 'no_auth'; + throw noAuth; } const account = ready[accountRoundRobin % ready.length]; accountRoundRobin++; @@ -214,6 +226,7 @@ function createSession() { messageCount: 0, accountId: null, history: [], + lastActivityAt: Date.now(), }; } @@ -221,7 +234,21 @@ function getOrCreateAgentSession(agentId) { if (!sessions.has(agentId)) { sessions.set(agentId, createSession()); } - return sessions.get(agentId); + const session = sessions.get(agentId); + session.lastActivityAt = Date.now(); + return session; +} + +// Evict idle sessions so the Map (keyed by client IP / user id) can't grow without +// bound on a long-running process. Drops entries untouched for 2× the session TTL. +function sweepIdleSessions(maxIdleMs = SESSION_TTL_MS * 2) { + const now = Date.now(); + let removed = 0; + for (const [agentId, session] of sessions) { + if (now - (session.lastActivityAt || 0) > maxIdleMs) { sessions.delete(agentId); removed++; } + } + if (removed) console.log(`[DS-API] swept ${removed} idle session(s); ${sessions.size} remain`); + return removed; } async function solvePOW(challenge, config = DS_CONFIG) { @@ -1132,7 +1159,17 @@ const server = http.createServer(async (req, res) => { // Health check if (req.method === 'GET' && (url.pathname === '/' || url.pathname === '/health')) { res.writeHead(200, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ status: 'ok', service: 'FreeDeepseekAPI', watermark: FORGETMEAI_WATERMARK, models: SUPPORTED_MODEL_IDS, unsupported_models: Object.keys(MODEL_CONFIGS).filter(id => !MODEL_CONFIGS[id].supported), agents: sessions.size, accounts: accounts.map(accountStatus), config_ready: hasAuthConfig(), session_reuse: { strategy: 'sticky per x-agent-session/user', ttl_minutes: Math.round(SESSION_TTL_MS / 60000), max_messages: MAX_MESSAGE_DEPTH, reset_all: 'POST /reset-session?agent=all' } })); + res.end(JSON.stringify({ status: 'ok', service: 'FreeDeepseekAPI', watermark: FORGETMEAI_WATERMARK, models: SUPPORTED_MODEL_IDS, unsupported_models: Object.keys(MODEL_CONFIGS).filter(id => !MODEL_CONFIGS[id].supported), agents: sessions.size, in_flight: inFlight, accounts: accounts.map(accountStatus), config_ready: hasAuthConfig(), session_reuse: { strategy: 'sticky per x-agent-session/user', ttl_minutes: Math.round(SESSION_TTL_MS / 60000), max_messages: MAX_MESSAGE_DEPTH, reset_all: 'POST /reset-session?agent=all' } })); + return; + } + + // Readiness probe (distinct from the liveness check above): 503 unless at least + // one account can serve right now, so an aggregator/LB won't route to a cold pool. + if (req.method === 'GET' && url.pathname === '/readyz') { + const now = Date.now(); + const ready = accounts.filter(a => a.config.token && a.config.cookie && a.cooldownUntil <= now).length; + res.writeHead(ready > 0 ? 200 : 503, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ ready: ready > 0, ready_accounts: ready, total_accounts: accounts.length })); return; } @@ -1203,9 +1240,28 @@ const server = http.createServer(async (req, res) => { res.writeHead(404); res.end('Not found'); return; } + // Backpressure: reject rather than fan out unbounded concurrent upstream work. + if (inFlight >= MAX_CONCURRENT) { + res.writeHead(503, { 'Content-Type': 'application/json', 'Retry-After': '2' }); + res.end(JSON.stringify({ error: { message: `Server busy (${inFlight}/${MAX_CONCURRENT} requests in flight). Retry shortly.`, type: 'overloaded' } })); + return; + } + let body = ''; - req.on('data', chunk => body += chunk); + let bodyTooLarge = false; + const MAX_BODY_BYTES = 10 * 1024 * 1024; // chat payloads are small; cap memory before JSON.parse + req.on('data', chunk => { body += chunk; if (body.length > MAX_BODY_BYTES) { bodyTooLarge = true; req.destroy(); } }); req.on('end', async () => { + if (bodyTooLarge) { + res.writeHead(413, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: { message: 'Request body too large', type: 'payload_too_large' } })); + return; + } + inFlight++; + let clientGone = false; + res.on('close', () => { clientGone = true; }); + const requestStartedAt = Date.now(); + const deadlineHit = () => Date.now() - requestStartedAt > REQUEST_DEADLINE_MS; try { const rawParams = JSON.parse(body || '{}'); const params = normalizeApiParams(rawParams, apiMode); @@ -1393,26 +1449,29 @@ const server = http.createServer(async (req, res) => { // Empty response — retry loop with fresh sessions let retryAttempt = 0; - const MAX_RETRIES = 10; while (!fullContent || fullContent.trim().length === 0) { + // Stop early if the client hung up or we've blown the request budget — + // no point burning more PoW solves + account quota for a dead socket. + if (clientGone) { console.log(`${agentTag} client disconnected; abandoning empty-retry loop`); return; } + if (deadlineHit()) { console.log(`${agentTag} request deadline hit; stopping empty-retry loop`); break; } retryAttempt++; - if (retryAttempt > MAX_RETRIES) { - console.log(`${agentTag} Empty after ${MAX_RETRIES} retries. Giving up.`); + if (retryAttempt > MAX_EMPTY_RETRIES) { + console.log(`${agentTag} Empty after ${MAX_EMPTY_RETRIES} retries. Giving up.`); res.writeHead(502, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ - error: { - message: `DeepSeek returned empty content after ${MAX_RETRIES} retries`, + res.end(JSON.stringify({ + error: { + message: `DeepSeek returned empty content after ${MAX_EMPTY_RETRIES} retries`, type: 'empty_response', agent: agentId, session_id: session.id, message_count: session.messageCount, history_length: session.history.length, retry_attempts: retryAttempt - 1, - } + } })); return; } - console.log(`${agentTag} Empty response (msg#${session.messageCount}, retry ${retryAttempt}/${MAX_RETRIES}). Resetting session...`); + console.log(`${agentTag} Empty response (msg#${session.messageCount}, retry ${retryAttempt}/${MAX_EMPTY_RETRIES}). Resetting session...`); session.id = null; session.parentMessageId = null; session.createdAt = null; @@ -1435,6 +1494,7 @@ const server = http.createServer(async (req, res) => { let continuationRounds = 0; const MAX_CONTINUATION = 2; while ((finishReason === 'length' || fullContent.length > 25000) && continuationRounds < MAX_CONTINUATION) { + if (clientGone || deadlineHit()) break; continuationRounds++; console.log(`${agentTag} Response ${fullContent.length} chars (finish=${finishReason}). Auto-continuing (${continuationRounds}/${MAX_CONTINUATION})...`); await new Promise(r => setTimeout(r, 500)); @@ -1527,8 +1587,16 @@ const server = http.createServer(async (req, res) => { } } catch (e) { console.log('[DS-API] Error:', e.message); - res.writeHead(500, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: { message: e.message, type: 'server_error' } })); + if (res.headersSent || clientGone) return; // streamed/aborted: nothing to send + // Pool exhaustion / no-auth carry an explicit status so integrators see + // 429/503 (not a generic 500) and can honor Retry-After. + const status = e.status || 500; + const headers = { 'Content-Type': 'application/json' }; + if (status === 429 && e.retryAfter) headers['Retry-After'] = String(e.retryAfter); + res.writeHead(status, headers); + res.end(JSON.stringify({ error: { message: e.message, type: e.type || 'server_error' } })); + } finally { + inFlight--; } }); }); @@ -1590,6 +1658,13 @@ async function main() { printBanner(); const shouldStart = await showStartupMenu(); if (!shouldStart) process.exit(0); + server.on('error', (err) => { + if (err.code === 'EADDRINUSE') console.error(`[DS-API] FATAL: port ${PORT} already in use. Set PORT= or stop the other instance.`); + else console.error('[DS-API] server error:', err); + process.exit(1); + }); + // Periodically evict idle sessions (unref'd so it never keeps the process alive). + setInterval(sweepIdleSessions, 10 * 60 * 1000).unref(); server.listen(PORT, HOST, () => { console.log(`[DS-API] Server on http://${HOST}:${PORT} (multi-agent sessions enabled)`); console.log(`[DS-API] ${formatWatermark()}`); @@ -1605,6 +1680,17 @@ async function main() { } if (require.main === module) { + // Don't let a stray rejection/throw take the whole proxy down silently. + process.on('unhandledRejection', (reason) => console.error('[DS-API] unhandledRejection:', reason)); + process.on('uncaughtException', (err) => console.error('[DS-API] uncaughtException:', err)); + // Graceful shutdown: stop accepting, drain, then exit (force-exit after 10s). + const shutdown = (sig) => { + console.log(`[DS-API] ${sig} received — shutting down…`); + server.close(() => process.exit(0)); + setTimeout(() => process.exit(0), 10000).unref(); + }; + process.on('SIGTERM', () => shutdown('SIGTERM')); + process.on('SIGINT', () => shutdown('SIGINT')); main().catch(err => { console.error('[DS-API] FATAL:', err); process.exit(1); }); } @@ -1615,5 +1701,8 @@ module.exports = { isDeepSeekModelErrorEvent, rebuildFragmentText, applyResponsePatchOperations, + createSession, + sweepIdleSessions, + sessions, }, }; diff --git a/tests/unit.test.js b/tests/unit.test.js index a02bc23..a313fb9 100644 --- a/tests/unit.test.js +++ b/tests/unit.test.js @@ -143,3 +143,12 @@ test('DeepSeek stream parser does not treat service content chunks as model erro assert.equal(serverInternals.isDeepSeekModelErrorEvent({ finish_reason: 'stop' }), false); assert.equal(serverInternals.isDeepSeekModelErrorEvent({ type: 'error', content: 'backend error' }), true); }); + +test('sweepIdleSessions evicts only idle entries', () => { + serverInternals.sessions.set('stale-x', { lastActivityAt: 1 }); + serverInternals.sessions.set('fresh-x', { lastActivityAt: Date.now() }); + serverInternals.sweepIdleSessions(60 * 1000); + assert.equal(serverInternals.sessions.has('stale-x'), false); + assert.equal(serverInternals.sessions.has('fresh-x'), true); + serverInternals.sessions.delete('fresh-x'); +});