diff --git a/src/auth/AuthSwitcher.js b/src/auth/AuthSwitcher.js index 11670468..d38ce8dd 100644 --- a/src/auth/AuthSwitcher.js +++ b/src/auth/AuthSwitcher.js @@ -49,6 +49,14 @@ class AuthSwitcher { // return available[nextIndexInArray]; // } + _withTimeout(promise, ms, message) { + let timer; + const timeout = new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error(message)), ms); + }); + return Promise.race([promise, timeout]).finally(() => clearTimeout(timer)); + } + async switchToNextAuth() { const available = this.authSource.getRotationIndices(); @@ -75,7 +83,11 @@ class AuthSwitcher { this.logger.info("=================================================="); try { - await this.browserManager.launchOrSwitchContext(singleIndex); + await this._withTimeout( + this.browserManager.launchOrSwitchContext(singleIndex), + 60_000, + `Single account #${singleIndex} restart timed out after 60s` + ); this.resetCounters(); this.browserManager.rebalanceContextPool().catch(err => { this.logger.error(`[Auth] Background rebalance failed: ${err.message}`); @@ -131,8 +143,14 @@ class AuthSwitcher { try { // Pre-cleanup: remove excess contexts BEFORE creating new one to avoid exceeding maxContexts - await this.browserManager.preCleanupForSwitch(accountIndex); - await this.browserManager.switchAccount(accountIndex); + await this._withTimeout( + (async () => { + await this.browserManager.preCleanupForSwitch(accountIndex); + await this.browserManager.switchAccount(accountIndex); + })(), + 60_000, + `Account #${accountIndex} switch timed out after 60s` + ); this.resetCounters(); this.browserManager.rebalanceContextPool().catch(err => { this.logger.error(`[Auth] Background rebalance failed: ${err.message}`); @@ -166,8 +184,14 @@ class AuthSwitcher { try { // Pre-cleanup: remove excess contexts BEFORE creating new one to avoid exceeding maxContexts - await this.browserManager.preCleanupForSwitch(originalStartAccount); - await this.browserManager.switchAccount(originalStartAccount); + await this._withTimeout( + (async () => { + await this.browserManager.preCleanupForSwitch(originalStartAccount); + await this.browserManager.switchAccount(originalStartAccount); + })(), + 60_000, + `Fallback account #${originalStartAccount} switch timed out after 60s` + ); this.resetCounters(); this.browserManager.rebalanceContextPool().catch(err => { this.logger.error(`[Auth] Background rebalance failed: ${err.message}`); @@ -227,8 +251,14 @@ class AuthSwitcher { try { this.logger.info(`🔄 [Auth] Starting switch to specified account #${targetIndex}...`); // Pre-cleanup: remove excess contexts BEFORE creating new one to avoid exceeding maxContexts - await this.browserManager.preCleanupForSwitch(targetIndex); - await this.browserManager.switchAccount(targetIndex); + await this._withTimeout( + (async () => { + await this.browserManager.preCleanupForSwitch(targetIndex); + await this.browserManager.switchAccount(targetIndex); + })(), + 60_000, + `Switch to account #${targetIndex} timed out after 60s` + ); this.resetCounters(); this.browserManager.rebalanceContextPool().catch(err => { this.logger.error(`[Auth] Background rebalance failed: ${err.message}`); diff --git a/src/core/BrowserManager.js b/src/core/BrowserManager.js index 6793e30e..5c3aa257 100644 --- a/src/core/BrowserManager.js +++ b/src/core/BrowserManager.js @@ -55,6 +55,8 @@ class BrowserManager { // ConnectionRegistry reference (set after construction to avoid circular dependency) this.connectionRegistry = null; + this._onAuthQueuesDrained = null; + this.pendingContextClosures = new Map(); // Background wakeup service status (instance-level, tracks this.page) // Prevents multiple BackgroundWakeup instances from running simultaneously @@ -148,7 +150,20 @@ class BrowserManager { * @param {ConnectionRegistry} connectionRegistry - The ConnectionRegistry instance */ setConnectionRegistry(connectionRegistry) { + if (this.connectionRegistry && this._onAuthQueuesDrained) { + this.connectionRegistry.off("authQueuesDrained", this._onAuthQueuesDrained); + } this.connectionRegistry = connectionRegistry; + if (this.connectionRegistry) { + this._onAuthQueuesDrained = authIndex => { + this._closePendingContextIfIdle(authIndex).catch(error => { + this.logger.error( + `[ContextPool] Failed to close pending context #${authIndex} after queue drain: ${error.message}` + ); + }); + }; + this.connectionRegistry.on("authQueuesDrained", this._onAuthQueuesDrained); + } } /** @@ -206,9 +221,7 @@ class BrowserManager { // Check if background preload was aborted (only for background tasks) if (isBackgroundTask && this._backgroundPreloadAbort) { this.logger.info(`${logPrefix} WebSocket wait aborted (background preload aborted)`); - throw new Error( - `Context initialization aborted for index ${authIndex} (background preload aborted)` - ); + throw new ContextAbortedError(authIndex, "background preload aborted"); } // Read state fresh each iteration @@ -1491,23 +1504,48 @@ class BrowserManager { * @returns {Promise} Resolves when the background task has been aborted and cleaned up */ async abortBackgroundPreload() { - if (!this._backgroundPreloadTask) { + const currentTask = this._backgroundPreloadTask; + if (!currentTask) { return; // No task to abort } this.logger.info(`[ContextPool] Aborting background preload task...`); this._backgroundPreloadAbort = true; + const timeoutMessage = "Background preload abort timed out after 10s"; try { - await this._backgroundPreloadTask; + await this._withTimeout(currentTask, 10_000, timeoutMessage); } catch (error) { - // Ignore errors from aborted task + if (error.message === timeoutMessage) { + this.logger.warn( + `[ContextPool] Background preload did not stop within 10s, forcing browser/context pool reset.` + ); + if (this.browser) { + await this.closeBrowser(); + } else { + this._cleanupAllContexts(); + } + if (this._backgroundPreloadTask === currentTask) { + this._backgroundPreloadTask = null; + } + return; + } + + // Ignore non-timeout errors from aborted task this.logger.debug(`[ContextPool] Background preload aborted: ${error.message}`); } this.logger.info(`[ContextPool] Background preload aborted successfully`); } + _withTimeout(promise, ms, message) { + let timer; + const timeout = new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error(message)), ms); + }); + return Promise.race([promise, timeout]).finally(() => clearTimeout(timer)); + } + /** * Background sequential initialization of contexts (fire-and-forget) * Only one instance should be active at a time - new calls abort old ones @@ -1537,6 +1575,105 @@ class BrowserManager { }); } + _getActiveQueueCountForAuth(authIndex) { + if (!this.connectionRegistry?.getMessageQueueCountForAuth) { + return 0; + } + return this.connectionRegistry.getMessageQueueCountForAuth(authIndex); + } + + _cancelPendingContextClosure(authIndex, reason = "closure_cancelled") { + if (!this.pendingContextClosures.has(authIndex)) { + return false; + } + this.pendingContextClosures.delete(authIndex); + this.logger.info(`[ContextPool] Cancelled pending close for context #${authIndex} (${reason}).`); + return true; + } + + _scheduleContextClosureWhenIdle(authIndex, reason, activeQueueCount) { + if (!this.contexts.has(authIndex)) { + return false; + } + const existingReason = this.pendingContextClosures.get(authIndex); + if (existingReason) { + this.logger.debug( + `[ContextPool] Context #${authIndex} is already pending close (${existingReason}), active queues: ${activeQueueCount}` + ); + return false; + } + this.pendingContextClosures.set(authIndex, reason); + this.logger.info( + `[ContextPool] Deferring close for busy context #${authIndex} (${activeQueueCount} active queue(s), reason: ${reason})` + ); + return false; + } + + async _closeContextForPoolIfPossible(authIndex, reason) { + const activeQueueCount = this._getActiveQueueCountForAuth(authIndex); + if (activeQueueCount > 0) { + return this._scheduleContextClosureWhenIdle(authIndex, reason, activeQueueCount); + } + + this.pendingContextClosures.delete(authIndex); + await this.closeContext(authIndex); + return true; + } + + async _closePendingContextIfIdle(authIndex) { + if (!this.pendingContextClosures.has(authIndex)) { + return false; + } + if (authIndex === this._currentAuthIndex) { + this.logger.debug( + `[ContextPool] Skipping pending close for context #${authIndex} because it is active again as current.` + ); + return false; + } + if (!this.contexts.has(authIndex)) { + this.pendingContextClosures.delete(authIndex); + return false; + } + + const activeQueueCount = this._getActiveQueueCountForAuth(authIndex); + if (activeQueueCount > 0) { + this.logger.debug( + `[ContextPool] Pending close for context #${authIndex} is still waiting on ${activeQueueCount} active queue(s).` + ); + return false; + } + + const pendingReason = this.pendingContextClosures.get(authIndex); + this.pendingContextClosures.delete(authIndex); + this.logger.info( + `[ContextPool] Closing deferred context #${authIndex} now that all queues are drained (reason: ${pendingReason}).` + ); + await this.closeContext(authIndex); + return true; + } + + async _flushPendingContextClosures() { + for (const authIndex of [...this.pendingContextClosures.keys()]) { + await this._closePendingContextIfIdle(authIndex); + } + } + + _prioritizeContextsForRemoval(indices) { + const idle = []; + const busy = []; + + for (const authIndex of indices) { + const activeQueueCount = this._getActiveQueueCountForAuth(authIndex); + if (activeQueueCount > 0) { + busy.push({ activeQueueCount, authIndex }); + } else { + idle.push(authIndex); + } + } + + return { busy, idle }; + } + /** * Internal method to execute the actual preload task * @private @@ -1759,15 +1896,32 @@ class BrowserManager { } } - // Remove contexts according to priority until we have enough space - const toRemove = removalPriority.slice(0, removeCount); + const { busy, idle } = this._prioritizeContextsForRemoval(removalPriority); + const toRemoveNow = idle.slice(0, removeCount); + const toDefer = busy.slice(0, Math.max(0, removeCount - toRemoveNow.length)); this.logger.info( - `[ContextPool] Pre-cleanup: removing ${toRemove.length} contexts before switch to #${targetAuthIndex}: [${toRemove}] (${this.contexts.size} ready + ${this.initializingContexts.size} initializing)` + `[ContextPool] Pre-cleanup before switch to #${targetAuthIndex}: immediate=[${toRemoveNow}], deferred=[${toDefer.map( + entry => `${entry.authIndex}:${entry.activeQueueCount}` + )}] (${this.contexts.size} ready + ${this.initializingContexts.size} initializing)` ); - for (const idx of toRemove) { - await this.closeContext(idx); + for (const idx of toRemoveNow) { + await this._closeContextForPoolIfPossible(idx, "pre_cleanup_for_switch"); + } + + for (const entry of toDefer) { + this._scheduleContextClosureWhenIdle( + entry.authIndex, + "pre_cleanup_for_switch", + entry.activeQueueCount + ); + } + + if (toDefer.length > 0) { + this.logger.warn( + `[ContextPool] Allowing temporary MAX_CONTEXTS overflow while busy contexts drain before switch to #${targetAuthIndex}.` + ); } } @@ -1840,12 +1994,20 @@ class BrowserManager { // If a foreground task is running, _executePreloadTask will skip it (line 1382) const candidates = ordered.filter(idx => !activeContexts.has(idx)); + const { busy, idle } = this._prioritizeContextsForRemoval(toRemove); + this.logger.info( - `[ContextPool] Rebalance: targets=[${[...targets]}], remove=[${toRemove}], candidates=[${candidates}]` + `[ContextPool] Rebalance: targets=[${[...targets]}], removeNow=[${idle}], removeDeferred=[${busy.map( + entry => `${entry.authIndex}:${entry.activeQueueCount}` + )}], candidates=[${candidates}]` ); - for (const idx of toRemove) { - await this.closeContext(idx); + for (const idx of idle) { + await this._closeContextForPoolIfPossible(idx, "rebalance"); + } + + for (const entry of busy) { + this._scheduleContextClosureWhenIdle(entry.authIndex, "rebalance", entry.activeQueueCount); } // Preload candidates if we have room in the pool @@ -2120,6 +2282,8 @@ class BrowserManager { throw new Error(`Invalid authIndex: ${authIndex}. Must be >= 0.`); } + this._cancelPendingContextClosure(authIndex, "context_reused"); + // [Auth Switch] Save current auth data before switching if (this.browser && this._currentAuthIndex >= 0 && this._currentAuthIndex !== authIndex) { try { @@ -2199,6 +2363,7 @@ class BrowserManager { // Switch to new context this._activateContext(contextData.context, contextData.page, authIndex); + await this._flushPendingContextClosures(); this.logger.info(`✅ [FastSwitch] Switched to account #${authIndex} instantly!`); return; @@ -2254,6 +2419,7 @@ class BrowserManager { const { context, page } = await this._initializeContext(authIndex, false); this._activateContext(context, page, authIndex); + await this._flushPendingContextClosures(); // If this account was marked as expired but login succeeded, restore it if (this.authSource.isExpired(authIndex)) { @@ -2461,6 +2627,8 @@ class BrowserManager { * @param {number} authIndex - The auth index to close */ async closeContext(authIndex) { + this.pendingContextClosures.delete(authIndex); + // If context is being initialized in background, signal abort and wait if (this.initializingContexts.has(authIndex)) { this.logger.info(`[Browser] Context #${authIndex} is being initialized, marking for abort and waiting...`); @@ -2554,6 +2722,7 @@ class BrowserManager { this.contexts.clear(); this.initializingContexts.clear(); this.abortedContexts.clear(); + this.pendingContextClosures.clear(); this._wsInitState.clear(); this.context = null; this.page = null; diff --git a/src/core/ConnectionRegistry.js b/src/core/ConnectionRegistry.js index 5be21071..73082be3 100644 --- a/src/core/ConnectionRegistry.js +++ b/src/core/ConnectionRegistry.js @@ -426,6 +426,7 @@ class ConnectionRegistry extends EventEmitter { // This prevents stale queues from lingering when retrying failed requests const existingEntry = this.messageQueues.get(requestId); if (existingEntry) { + const existingAuthIndex = existingEntry.authIndex; this.logger.debug( `[Registry] Found existing message queue for request ${requestId} (authIndex=${existingEntry.authIndex}), closing it before creating new one` ); @@ -435,6 +436,7 @@ class ConnectionRegistry extends EventEmitter { this.logger.debug(`[Registry] Failed to close existing queue for ${requestId}: ${e.message}`); } this.messageQueues.delete(requestId); + this._emitAuthQueuesDrainedIfNeeded(existingAuthIndex); } const queue = new MessageQueue(); @@ -456,8 +458,10 @@ class ConnectionRegistry extends EventEmitter { removeMessageQueue(requestId, reason = "handler_cleanup") { const entry = this.messageQueues.get(requestId); if (entry) { + const authIndex = entry.authIndex; entry.queue.close(reason); this.messageQueues.delete(requestId); + this._emitAuthQueuesDrainedIfNeeded(authIndex); } } @@ -481,6 +485,21 @@ class ConnectionRegistry extends EventEmitter { return entry ? entry.requestAttemptId || null : null; } + /** + * Get the number of active message queues for a specific account + * @param {number} authIndex - The account index to inspect + * @returns {number} Number of active message queues + */ + getMessageQueueCountForAuth(authIndex) { + let count = 0; + for (const entry of this.messageQueues.values()) { + if (entry.authIndex === authIndex) { + count++; + } + } + return count; + } + /** * Close all message queues belonging to a specific account * @param {number} authIndex - The account whose queues should be closed @@ -505,6 +524,7 @@ class ConnectionRegistry extends EventEmitter { `[Registry] Force closed ${count} pending message queue(s) for account #${authIndex} (reason: ${reason})` ); } + this._emitAuthQueuesDrainedIfNeeded(authIndex); return count; } @@ -548,6 +568,7 @@ class ConnectionRegistry extends EventEmitter { this.logger.debug(`[Registry] Failed to close stale queue for ${requestId}: ${e.message}`); } this.messageQueues.delete(requestId); + this._emitAuthQueuesDrainedIfNeeded(entry.authIndex); cleanedCount++; } } @@ -559,6 +580,15 @@ class ConnectionRegistry extends EventEmitter { return cleanedCount; } + _emitAuthQueuesDrainedIfNeeded(authIndex) { + if (!Number.isInteger(authIndex) || authIndex < 0) { + return; + } + if (this.getMessageQueueCountForAuth(authIndex) === 0) { + this.emit("authQueuesDrained", authIndex); + } + } + /** * Safely close a WebSocket connection with readyState check * @param {WebSocket} ws - The WebSocket to close diff --git a/src/core/RequestHandler.js b/src/core/RequestHandler.js index 0faddee6..63498f63 100644 --- a/src/core/RequestHandler.js +++ b/src/core/RequestHandler.js @@ -21,6 +21,7 @@ const WS_CONNECTION_READY_TIMEOUT_MS = 10000; const TIMEOUTS = { FAKE_STREAM: 300000, // 300 seconds (5 minutes) - timeout for fake streaming (buffered response) STREAM_CHUNK: 60000, // 60 seconds - timeout between stream chunks + ACCOUNT_SWITCH: 60000, // 60 seconds - timeout for account switch / direct recovery operations }; class RequestHandler { @@ -657,16 +658,21 @@ class RequestHandler { this.authSwitcher.isSystemBusy = true; this.logger.info(`[System] Set isSystemBusy=true for direct recovery to account #${recoveryAuthIndex}`); - await this.browserManager.launchOrSwitchContext(recoveryAuthIndex); - this.logger.info(`✅ [System] Browser successfully recovered to account #${recoveryAuthIndex}!`); + await this._withTimeout( + (async () => { + await this.browserManager.launchOrSwitchContext(recoveryAuthIndex); + this.logger.info(`✅ [System] Browser successfully recovered to account #${recoveryAuthIndex}!`); - // Wait for WebSocket connection to be established - this.logger.info("[System] Waiting for WebSocket connection to be ready..."); - const connectionReady = await this._waitForConnection(WS_CONNECTION_READY_TIMEOUT_MS); - if (!connectionReady) { - throw new Error("WebSocket connection not established within timeout period"); - } - this.logger.info("✅ [System] WebSocket connection is ready!"); + this.logger.info("[System] Waiting for WebSocket connection to be ready..."); + const connectionReady = await this._waitForConnection(WS_CONNECTION_READY_TIMEOUT_MS); + if (!connectionReady) { + throw new Error("WebSocket connection not established within timeout period"); + } + this.logger.info("✅ [System] WebSocket connection is ready!"); + })(), + this.timeouts.ACCOUNT_SWITCH, + `Direct recovery to account #${recoveryAuthIndex} timed out after 60s` + ); recoverySuccess = true; } else if (this.authSource.getRotationIndices().length > 0) { // Don't set isSystemBusy here - let switchToNextAuth manage it @@ -4135,6 +4141,14 @@ class RequestHandler { _generateRequestAttemptId(requestId, attemptNumber) { return `${requestId}_attempt_${attemptNumber}_${Math.random().toString(36).substring(2, 8)}`; } + + _withTimeout(promise, ms, message) { + let timer; + const timeout = new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error(message)), ms); + }); + return Promise.race([promise, timeout]).finally(() => clearTimeout(timer)); + } } module.exports = RequestHandler;