diff --git a/src/core/BrowserManager.js b/src/core/BrowserManager.js index b976c68..10dd199 100644 --- a/src/core/BrowserManager.js +++ b/src/core/BrowserManager.js @@ -55,6 +55,9 @@ class BrowserManager { // ConnectionRegistry reference (set after construction to avoid circular dependency) this.connectionRegistry = null; + this._onAuthQueuesDrained = null; + this._isSystemBusyProvider = null; + this.pendingContextClosures = new Map(); // Background wakeup service status (instance-level, tracks this.page) // Prevents multiple BackgroundWakeup instances from running simultaneously @@ -148,7 +151,33 @@ class BrowserManager { * @param {ConnectionRegistry} connectionRegistry - The ConnectionRegistry instance */ setConnectionRegistry(connectionRegistry) { + if (this.connectionRegistry && this._onAuthQueuesDrained) { + this.connectionRegistry.off("authQueuesDrained", this._onAuthQueuesDrained); + } this.connectionRegistry = connectionRegistry; + if (this.connectionRegistry) { + this._onAuthQueuesDrained = authIndex => { + this._closePendingContextIfIdle(authIndex).catch(error => { + this.logger.error( + `[ContextPool] Failed to close pending context #${authIndex} after queue drain: ${error.message}` + ); + }); + }; + this.connectionRegistry.on("authQueuesDrained", this._onAuthQueuesDrained); + } + } + + setSystemBusyProvider(provider) { + this._isSystemBusyProvider = typeof provider === "function" ? provider : null; + } + + _isSystemBusy() { + try { + return this._isSystemBusyProvider?.() === true; + } catch (error) { + this.logger.warn(`[ContextPool] Failed to read system busy state: ${error.message}`); + return false; + } } /** @@ -1532,6 +1561,109 @@ class BrowserManager { }); } + _hasActiveQueueForAuth(authIndex) { + if (this.connectionRegistry?.hasMessageQueueForAuth) { + return this.connectionRegistry.hasMessageQueueForAuth(authIndex); + } + return false; + } + + _cancelPendingContextClosure(authIndex, reason = "closure_cancelled") { + if (!this.pendingContextClosures.has(authIndex)) { + return false; + } + this.pendingContextClosures.delete(authIndex); + this.logger.info(`[ContextPool] Cancelled pending close for context #${authIndex} (${reason}).`); + return true; + } + + _scheduleContextClosureWhenIdle(authIndex, reason) { + if (!this.contexts.has(authIndex)) { + return false; + } + const existingReason = this.pendingContextClosures.get(authIndex); + if (existingReason) { + this.logger.debug( + `[ContextPool] Context #${authIndex} is already pending close (${existingReason}), active queues are still present` + ); + return false; + } + this.pendingContextClosures.set(authIndex, reason); + this.logger.info( + `[ContextPool] Deferring close for busy context #${authIndex} (active queue(s) present, reason: ${reason})` + ); + return false; + } + + async _closeContextForPoolIfPossible(authIndex, reason) { + if (this._hasActiveQueueForAuth(authIndex)) { + return this._scheduleContextClosureWhenIdle(authIndex, reason); + } + + this.pendingContextClosures.delete(authIndex); + await this.closeContext(authIndex); + return true; + } + + async _closePendingContextIfIdle(authIndex) { + if (!this.pendingContextClosures.has(authIndex)) { + return false; + } + if (authIndex === this._currentAuthIndex) { + this.logger.debug( + `[ContextPool] Skipping pending close for context #${authIndex} because it is active again as current.` + ); + return false; + } + if (!this.contexts.has(authIndex)) { + this.pendingContextClosures.delete(authIndex); + return false; + } + + if (this._hasActiveQueueForAuth(authIndex)) { + this.logger.debug( + `[ContextPool] Pending close for context #${authIndex} is still waiting on active queue(s).` + ); + return false; + } + + const pendingReason = this.pendingContextClosures.get(authIndex); + this.pendingContextClosures.delete(authIndex); + this.logger.info( + `[ContextPool] Closing deferred context #${authIndex} now that all queues are drained (reason: ${pendingReason}).` + ); + await this.closeContext(authIndex); + if (this._isSystemBusy()) { + this.logger.info("[ContextPool] Skipping rebalance after deferred close because system is busy."); + return true; + } + this.rebalanceContextPool().catch(error => { + this.logger.error(`[ContextPool] Rebalance after deferred close failed: ${error.message}`); + }); + return true; + } + + async _flushPendingContextClosures() { + for (const authIndex of [...this.pendingContextClosures.keys()]) { + await this._closePendingContextIfIdle(authIndex); + } + } + + _prioritizeContextsForRemoval(indices) { + const idle = []; + const busy = []; + + for (const authIndex of indices) { + if (this._hasActiveQueueForAuth(authIndex)) { + busy.push({ authIndex }); + } else { + idle.push(authIndex); + } + } + + return { busy, idle }; + } + /** * Internal method to execute the actual preload task * @private @@ -1754,15 +1886,28 @@ class BrowserManager { } } - // Remove contexts according to priority until we have enough space - const toRemove = removalPriority.slice(0, removeCount); + const { busy, idle } = this._prioritizeContextsForRemoval(removalPriority); + const toRemoveNow = idle.slice(0, removeCount); + const toDefer = busy.slice(0, Math.max(0, removeCount - toRemoveNow.length)); this.logger.info( - `[ContextPool] Pre-cleanup: removing ${toRemove.length} contexts before switch to #${targetAuthIndex}: [${toRemove}] (${this.contexts.size} ready + ${this.initializingContexts.size} initializing)` + `[ContextPool] Pre-cleanup before switch to #${targetAuthIndex}: immediate=[${toRemoveNow}], deferred=[${toDefer.map( + entry => entry.authIndex + )}] (${this.contexts.size} ready + ${this.initializingContexts.size} initializing)` ); - for (const idx of toRemove) { - await this.closeContext(idx); + for (const idx of toRemoveNow) { + await this._closeContextForPoolIfPossible(idx, "pre_cleanup_for_switch"); + } + + for (const entry of toDefer) { + this._scheduleContextClosureWhenIdle(entry.authIndex, "pre_cleanup_for_switch"); + } + + if (toDefer.length > 0) { + this.logger.warn( + `[ContextPool] Allowing temporary MAX_CONTEXTS overflow while busy contexts drain before switch to #${targetAuthIndex}.` + ); } } @@ -1796,6 +1941,10 @@ class BrowserManager { targets = new Set(ordered.slice(0, maxContexts)); } + for (const idx of targets) { + this._cancelPendingContextClosure(idx, "rebalance_target"); + } + // Remove contexts not in targets (except current) // Special handling: if current account is a duplicate (old version), also remove its canonical version // BUT only in limited mode - in unlimited mode, keep all contexts @@ -1835,16 +1984,25 @@ class BrowserManager { // If a foreground task is running, _executePreloadTask will skip it (line 1382) const candidates = ordered.filter(idx => !activeContexts.has(idx)); + const { busy, idle } = this._prioritizeContextsForRemoval(toRemove); + this.logger.info( - `[ContextPool] Rebalance: targets=[${[...targets]}], remove=[${toRemove}], candidates=[${candidates}]` + `[ContextPool] Rebalance: targets=[${[...targets]}], removeNow=[${idle}], removeDeferred=[${busy.map( + entry => entry.authIndex + )}], candidates=[${candidates}]` ); - for (const idx of toRemove) { - await this.closeContext(idx); + for (const idx of idle) { + await this._closeContextForPoolIfPossible(idx, "rebalance"); + } + + for (const entry of busy) { + this._scheduleContextClosureWhenIdle(entry.authIndex, "rebalance"); } - // Preload candidates if we have room in the pool - if (candidates.length > 0 && (isUnlimited || this.contexts.size < maxContexts)) { + // Preload candidates if ready and initializing contexts still leave room in the pool + const poolOccupancy = this.contexts.size + this.initializingContexts.size; + if (candidates.length > 0 && (isUnlimited || poolOccupancy < maxContexts)) { this._preloadBackgroundContexts(candidates, isUnlimited ? 0 : maxContexts); } } @@ -2115,6 +2273,8 @@ class BrowserManager { throw new Error(`Invalid authIndex: ${authIndex}. Must be >= 0.`); } + this._cancelPendingContextClosure(authIndex, "context_reused"); + // [Auth Switch] Save current auth data before switching if (this.browser && this._currentAuthIndex >= 0 && this._currentAuthIndex !== authIndex) { try { @@ -2194,6 +2354,7 @@ class BrowserManager { // Switch to new context this._activateContext(contextData.context, contextData.page, authIndex); + await this._flushPendingContextClosures(); this.logger.info(`✅ [FastSwitch] Switched to account #${authIndex} instantly!`); return; @@ -2249,6 +2410,7 @@ class BrowserManager { const { context, page } = await this._initializeContext(authIndex, false); this._activateContext(context, page, authIndex); + await this._flushPendingContextClosures(); // If this account was marked as expired but login succeeded, restore it if (this.authSource.isExpired(authIndex)) { @@ -2456,6 +2618,8 @@ class BrowserManager { * @param {number} authIndex - The auth index to close */ async closeContext(authIndex) { + this.pendingContextClosures.delete(authIndex); + // If context is being initialized in background, signal abort and wait if (this.initializingContexts.has(authIndex)) { this.logger.info(`[Browser] Context #${authIndex} is being initialized, marking for abort and waiting...`); @@ -2549,6 +2713,7 @@ class BrowserManager { this.contexts.clear(); this.initializingContexts.clear(); this.abortedContexts.clear(); + this.pendingContextClosures.clear(); this._wsInitState.clear(); this.context = null; this.page = null; diff --git a/src/core/ConnectionRegistry.js b/src/core/ConnectionRegistry.js index 5be2107..9618256 100644 --- a/src/core/ConnectionRegistry.js +++ b/src/core/ConnectionRegistry.js @@ -309,13 +309,19 @@ class ConnectionRegistry extends EventEmitter { getConnectionByAuth(authIndex, log = true) { const connection = this.connectionsByAuth.get(authIndex); - if (connection && log) { + + if (!log) { + return connection; + } + + if (connection) { this.logger.debug(`[Registry] Found WebSocket connection for authIndex=${authIndex}`); } else if (this.logger.getLevel?.() === "DEBUG") { this.logger.debug( `[Registry] No WebSocket connection found for authIndex=${authIndex}. Available: [${Array.from(this.connectionsByAuth.keys()).join(", ")}]` ); } + return connection; } @@ -426,6 +432,7 @@ class ConnectionRegistry extends EventEmitter { // This prevents stale queues from lingering when retrying failed requests const existingEntry = this.messageQueues.get(requestId); if (existingEntry) { + const existingAuthIndex = existingEntry.authIndex; this.logger.debug( `[Registry] Found existing message queue for request ${requestId} (authIndex=${existingEntry.authIndex}), closing it before creating new one` ); @@ -435,6 +442,9 @@ class ConnectionRegistry extends EventEmitter { this.logger.debug(`[Registry] Failed to close existing queue for ${requestId}: ${e.message}`); } this.messageQueues.delete(requestId); + if (existingAuthIndex !== authIndex) { + this._emitAuthQueuesDrainedIfNeeded(existingAuthIndex); + } } const queue = new MessageQueue(); @@ -456,8 +466,10 @@ class ConnectionRegistry extends EventEmitter { removeMessageQueue(requestId, reason = "handler_cleanup") { const entry = this.messageQueues.get(requestId); if (entry) { + const authIndex = entry.authIndex; entry.queue.close(reason); this.messageQueues.delete(requestId); + this._emitAuthQueuesDrainedIfNeeded(authIndex); } } @@ -481,6 +493,20 @@ class ConnectionRegistry extends EventEmitter { return entry ? entry.requestAttemptId || null : null; } + /** + * Check whether a specific account has any active message queue. + * @param {number} authIndex - The account index to inspect + * @returns {boolean} True if at least one active message queue belongs to the account + */ + hasMessageQueueForAuth(authIndex) { + for (const entry of this.messageQueues.values()) { + if (entry.authIndex === authIndex) { + return true; + } + } + return false; + } + /** * Close all message queues belonging to a specific account * @param {number} authIndex - The account whose queues should be closed @@ -504,6 +530,7 @@ class ConnectionRegistry extends EventEmitter { this.logger.info( `[Registry] Force closed ${count} pending message queue(s) for account #${authIndex} (reason: ${reason})` ); + this._emitAuthQueuesDrainedIfNeeded(authIndex); } return count; } @@ -548,6 +575,7 @@ class ConnectionRegistry extends EventEmitter { this.logger.debug(`[Registry] Failed to close stale queue for ${requestId}: ${e.message}`); } this.messageQueues.delete(requestId); + this._emitAuthQueuesDrainedIfNeeded(entry.authIndex); cleanedCount++; } } @@ -559,6 +587,15 @@ class ConnectionRegistry extends EventEmitter { return cleanedCount; } + _emitAuthQueuesDrainedIfNeeded(authIndex) { + if (!Number.isInteger(authIndex) || authIndex < 0) { + return; + } + if (!this.hasMessageQueueForAuth(authIndex)) { + this.emit("authQueuesDrained", authIndex); + } + } + /** * Safely close a WebSocket connection with readyState check * @param {WebSocket} ws - The WebSocket to close diff --git a/src/core/ProxyServerSystem.js b/src/core/ProxyServerSystem.js index 5699a3c..5a34241 100644 --- a/src/core/ProxyServerSystem.js +++ b/src/core/ProxyServerSystem.js @@ -111,6 +111,7 @@ class ProxyServerSystem extends EventEmitter { this.config, this.authSource ); + this.browserManager.setSystemBusyProvider(() => this.requestHandler?.isSystemBusy === true); this.httpServer = null; this.wsServer = null; diff --git a/src/core/RequestHandler.js b/src/core/RequestHandler.js index db96f8d..e4a77fb 100644 --- a/src/core/RequestHandler.js +++ b/src/core/RequestHandler.js @@ -64,6 +64,10 @@ class RequestHandler { return this.authSwitcher.isSystemBusy; } + set isSystemBusy(value) { + this.authSwitcher.isSystemBusy = value === true; + } + _getUsageStatsService() { return this.serverSystem.usageStatsService || null; } @@ -537,10 +541,6 @@ class RequestHandler { } async _waitForSystemAndConnectionIfBusy(res = null, options = {}) { - if (!this.authSwitcher.isSystemBusy) { - return true; - } - const { busyMessage = "Server undergoing internal maintenance (account switching/recovery), please try again later.", connectionMessage = "Service temporarily unavailable: Connection not established after switching.", @@ -732,6 +732,7 @@ class RequestHandler { this.authSwitcher.isSystemBusy = true; this.logger.info(`[System] Set isSystemBusy=true for direct recovery to account #${recoveryAuthIndex}`); + await this.browserManager.preCleanupForSwitch(recoveryAuthIndex); await this.browserManager.launchOrSwitchContext(recoveryAuthIndex); this.logger.info(`✅ [System] Browser successfully recovered to account #${recoveryAuthIndex}!`); @@ -3217,8 +3218,12 @@ class RequestHandler { async _executeRequestWithRetries(proxyRequest, messageQueue) { let lastError = null; let currentQueue = messageQueue; - // Track the authIndex for the current queue to ensure proper cleanup - let currentQueueAuthIndex = this.currentAuthIndex; + const registeredQueueAuthIndex = this.connectionRegistry.getAuthIndexForRequest(proxyRequest.request_id); + // Track the authIndex registered for the current queue, which may differ from the global current account. + let currentQueueAuthIndex = + Number.isInteger(registeredQueueAuthIndex) && registeredQueueAuthIndex >= 0 + ? registeredQueueAuthIndex + : this.currentAuthIndex; let retryAttempt = 1; const immediateSwitchTracker = this._createImmediateSwitchTracker(currentQueueAuthIndex); @@ -3271,10 +3276,61 @@ class RequestHandler { // Check the actual closure reason to provide accurate error messages const reason = error.reason || "unknown"; const isClientDisconnect = reason === "client_disconnect"; + const currentAuthIndex = this.currentAuthIndex; + const isClosedAccountRetryable = reason === "context_closed" || reason === "page_closed"; + const canRetryOnCurrentAccountCandidate = + !isClientDisconnect && + isClosedAccountRetryable && + retryAttempt < this.maxRetries && + Number.isInteger(currentQueueAuthIndex) && + currentQueueAuthIndex >= 0 && + Number.isInteger(currentAuthIndex) && + currentAuthIndex >= 0 && + currentQueueAuthIndex !== currentAuthIndex; + + if (canRetryOnCurrentAccountCandidate) { + const ready = await this._waitForSystemAndConnectionIfBusy(null, { + connectionMessage: "Service temporarily unavailable: Connection not ready before retry.", + }); + if (!ready) { + lastError = { + message: `WebSocket connection not ready before retry on account #${this.currentAuthIndex}.`, + status: 503, + }; + break; + } + } + + const canRetryOnCurrentAccount = + canRetryOnCurrentAccountCandidate && + Boolean(this.connectionRegistry.getConnectionByAuth(currentAuthIndex, false)); if (isClientDisconnect) { this.logger.warn(`[Request] Message queue closed due to client disconnect, aborting retries.`); lastError = { message: "Connection lost (client disconnect)", status: 503 }; + } else if (canRetryOnCurrentAccount) { + this.logger.warn( + `[Request] Message queue for non-current account #${currentQueueAuthIndex} closed ` + + `(reason: ${reason}); retrying request #${proxyRequest.request_id} on current account #${currentAuthIndex}.` + ); + lastError = { + message: `Queue closed: ${error.message || reason}`, + reason, + status: 503, + }; + this._advanceProxyRequestAttempt(proxyRequest); + currentQueue = this.connectionRegistry.createMessageQueue( + proxyRequest.request_id, + currentAuthIndex, + proxyRequest.request_attempt_id + ); + currentQueueAuthIndex = currentAuthIndex; + if (Number.isInteger(currentQueueAuthIndex) && currentQueueAuthIndex >= 0) { + immediateSwitchTracker.attemptedAuthIndices.add(currentQueueAuthIndex); + } + await new Promise(resolve => setTimeout(resolve, this.retryDelay)); + retryAttempt++; + continue; } else { // Queue closed for other reasons (account_switch, system_reset, etc.) this.logger.warn(`[Request] Message queue closed (reason: ${reason}), aborting retries.`); @@ -3377,6 +3433,17 @@ class RequestHandler { // Wait before the next retry await new Promise(resolve => setTimeout(resolve, this.retryDelay)); + if ( + !(await this._waitForSystemAndConnectionIfBusy(null, { + connectionMessage: "Service temporarily unavailable: Connection not ready before retry.", + })) + ) { + lastError = { + message: `WebSocket connection not ready before retry on account #${this.currentAuthIndex}.`, + status: 503, + }; + break; + } retryAttempt++; } } diff --git a/src/routes/StatusRoutes.js b/src/routes/StatusRoutes.js index bfc2570..9b0b456 100644 --- a/src/routes/StatusRoutes.js +++ b/src/routes/StatusRoutes.js @@ -467,9 +467,9 @@ class StatusRoutes { `[WebUI] Current active account #${currentAuthIndex} was deleted. Closing context and connection...` ); // Set system busy flag to prevent new requests during cleanup - const previousBusy = this.serverSystem.isSystemBusy === true; + const previousBusy = this.serverSystem.requestHandler.isSystemBusy === true; if (!previousBusy) { - this.serverSystem.isSystemBusy = true; + this.serverSystem.requestHandler.isSystemBusy = true; } try { // 1. Terminate pending requests for the current account @@ -484,7 +484,7 @@ class StatusRoutes { } finally { // Reset system busy flag after cleanup completes if (!previousBusy) { - this.serverSystem.isSystemBusy = false; + this.serverSystem.requestHandler.isSystemBusy = false; } } } @@ -657,9 +657,9 @@ class StatusRoutes { if (targetIndex === currentAuthIndex) { // Set system busy flag to prevent new requests during cleanup - const previousBusy = this.serverSystem.isSystemBusy === true; + const previousBusy = this.serverSystem.requestHandler.isSystemBusy === true; if (!previousBusy) { - this.serverSystem.isSystemBusy = true; + this.serverSystem.requestHandler.isSystemBusy = true; } try { // If deleting the current account, terminate its pending requests first @@ -671,7 +671,7 @@ class StatusRoutes { } finally { // Reset system busy flag after cleanup completes if (!previousBusy) { - this.serverSystem.isSystemBusy = false; + this.serverSystem.requestHandler.isSystemBusy = false; } } } else {