diff --git a/src/core/BrowserManager.js b/src/core/BrowserManager.js index b976c68..9406c86 100644 --- a/src/core/BrowserManager.js +++ b/src/core/BrowserManager.js @@ -55,6 +55,9 @@ class BrowserManager { // ConnectionRegistry reference (set after construction to avoid circular dependency) this.connectionRegistry = null; + this._onAuthQueuesDrained = null; + this._isSystemBusyProvider = null; + this.pendingContextClosures = new Map(); // Background wakeup service status (instance-level, tracks this.page) // Prevents multiple BackgroundWakeup instances from running simultaneously @@ -148,7 +151,33 @@ class BrowserManager { * @param {ConnectionRegistry} connectionRegistry - The ConnectionRegistry instance */ setConnectionRegistry(connectionRegistry) { + if (this.connectionRegistry && this._onAuthQueuesDrained) { + this.connectionRegistry.off("authQueuesDrained", this._onAuthQueuesDrained); + } this.connectionRegistry = connectionRegistry; + if (this.connectionRegistry) { + this._onAuthQueuesDrained = authIndex => { + this._closePendingContextIfIdle(authIndex).catch(error => { + this.logger.error( + `[ContextPool] Failed to close pending context #${authIndex} after queue drain: ${error.message}` + ); + }); + }; + this.connectionRegistry.on("authQueuesDrained", this._onAuthQueuesDrained); + } + } + + setSystemBusyProvider(provider) { + this._isSystemBusyProvider = typeof provider === "function" ? provider : null; + } + + _isSystemBusy() { + try { + return this._isSystemBusyProvider?.() === true; + } catch (error) { + this.logger.warn(`[ContextPool] Failed to read system busy state: ${error.message}`); + return false; + } } /** @@ -1532,6 +1561,112 @@ class BrowserManager { }); } + _getActiveQueueCountForAuth(authIndex) { + if (!this.connectionRegistry?.getMessageQueueCountForAuth) { + return 0; + } + return this.connectionRegistry.getMessageQueueCountForAuth(authIndex); + } + + _cancelPendingContextClosure(authIndex, reason = "closure_cancelled") { + if (!this.pendingContextClosures.has(authIndex)) { + return false; + } + this.pendingContextClosures.delete(authIndex); + this.logger.info(`[ContextPool] Cancelled pending close for context #${authIndex} (${reason}).`); + return true; + } + + _scheduleContextClosureWhenIdle(authIndex, reason, activeQueueCount) { + if (!this.contexts.has(authIndex)) { + return false; + } + const existingReason = this.pendingContextClosures.get(authIndex); + if (existingReason) { + this.logger.debug( + `[ContextPool] Context #${authIndex} is already pending close (${existingReason}), active queues: ${activeQueueCount}` + ); + return false; + } + this.pendingContextClosures.set(authIndex, reason); + this.logger.info( + `[ContextPool] Deferring close for busy context #${authIndex} (${activeQueueCount} active queue(s), reason: ${reason})` + ); + return false; + } + + async _closeContextForPoolIfPossible(authIndex, reason) { + const activeQueueCount = this._getActiveQueueCountForAuth(authIndex); + if (activeQueueCount > 0) { + return this._scheduleContextClosureWhenIdle(authIndex, reason, activeQueueCount); + } + + this.pendingContextClosures.delete(authIndex); + await this.closeContext(authIndex); + return true; + } + + async _closePendingContextIfIdle(authIndex) { + if (!this.pendingContextClosures.has(authIndex)) { + return false; + } + if (authIndex === this._currentAuthIndex) { + this.logger.debug( + `[ContextPool] Skipping pending close for context #${authIndex} because it is active again as current.` + ); + return false; + } + if (!this.contexts.has(authIndex)) { + this.pendingContextClosures.delete(authIndex); + return false; + } + + const activeQueueCount = this._getActiveQueueCountForAuth(authIndex); + if (activeQueueCount > 0) { + this.logger.debug( + `[ContextPool] Pending close for context #${authIndex} is still waiting on ${activeQueueCount} active queue(s).` + ); + return false; + } + + const pendingReason = this.pendingContextClosures.get(authIndex); + this.pendingContextClosures.delete(authIndex); + this.logger.info( + `[ContextPool] Closing deferred context #${authIndex} now that all queues are drained (reason: ${pendingReason}).` + ); + await this.closeContext(authIndex); + if (this._isSystemBusy()) { + this.logger.info("[ContextPool] Skipping rebalance after deferred close because system is busy."); + return true; + } + this.rebalanceContextPool().catch(error => { + this.logger.error(`[ContextPool] Rebalance after deferred close failed: ${error.message}`); + }); + return true; + } + + async _flushPendingContextClosures() { + for (const authIndex of [...this.pendingContextClosures.keys()]) { + await this._closePendingContextIfIdle(authIndex); + } + } + + _prioritizeContextsForRemoval(indices) { + const idle = []; + const busy = []; + + for (const authIndex of indices) { + const activeQueueCount = this._getActiveQueueCountForAuth(authIndex); + if (activeQueueCount > 0) { + busy.push({ activeQueueCount, authIndex }); + } else { + idle.push(authIndex); + } + } + + return { busy, idle }; + } + /** * Internal method to execute the actual preload task * @private @@ -1754,15 +1889,28 @@ class BrowserManager { } } - // Remove contexts according to priority until we have enough space - const toRemove = removalPriority.slice(0, removeCount); + const { busy, idle } = this._prioritizeContextsForRemoval(removalPriority); + const toRemoveNow = idle.slice(0, removeCount); + const toDefer = busy.slice(0, Math.max(0, removeCount - toRemoveNow.length)); this.logger.info( - `[ContextPool] Pre-cleanup: removing ${toRemove.length} contexts before switch to #${targetAuthIndex}: [${toRemove}] (${this.contexts.size} ready + ${this.initializingContexts.size} initializing)` + `[ContextPool] Pre-cleanup before switch to #${targetAuthIndex}: immediate=[${toRemoveNow}], deferred=[${toDefer.map( + entry => `${entry.authIndex}:${entry.activeQueueCount}` + )}] (${this.contexts.size} ready + ${this.initializingContexts.size} initializing)` ); - for (const idx of toRemove) { - await this.closeContext(idx); + for (const idx of toRemoveNow) { + await this._closeContextForPoolIfPossible(idx, "pre_cleanup_for_switch"); + } + + for (const entry of toDefer) { + this._scheduleContextClosureWhenIdle(entry.authIndex, "pre_cleanup_for_switch", entry.activeQueueCount); + } + + if (toDefer.length > 0) { + this.logger.warn( + `[ContextPool] Allowing temporary MAX_CONTEXTS overflow while busy contexts drain before switch to #${targetAuthIndex}.` + ); } } @@ -1796,6 +1944,10 @@ class BrowserManager { targets = new Set(ordered.slice(0, maxContexts)); } + for (const idx of targets) { + this._cancelPendingContextClosure(idx, "rebalance_target"); + } + // Remove contexts not in targets (except current) // Special handling: if current account is a duplicate (old version), also remove its canonical version // BUT only in limited mode - in unlimited mode, keep all contexts @@ -1835,16 +1987,25 @@ class BrowserManager { // If a foreground task is running, _executePreloadTask will skip it (line 1382) const candidates = ordered.filter(idx => !activeContexts.has(idx)); + const { busy, idle } = this._prioritizeContextsForRemoval(toRemove); + this.logger.info( - `[ContextPool] Rebalance: targets=[${[...targets]}], remove=[${toRemove}], candidates=[${candidates}]` + `[ContextPool] Rebalance: targets=[${[...targets]}], removeNow=[${idle}], removeDeferred=[${busy.map( + entry => `${entry.authIndex}:${entry.activeQueueCount}` + )}], candidates=[${candidates}]` ); - for (const idx of toRemove) { - await this.closeContext(idx); + for (const idx of idle) { + await this._closeContextForPoolIfPossible(idx, "rebalance"); + } + + for (const entry of busy) { + this._scheduleContextClosureWhenIdle(entry.authIndex, "rebalance", entry.activeQueueCount); } - // Preload candidates if we have room in the pool - if (candidates.length > 0 && (isUnlimited || this.contexts.size < maxContexts)) { + // Preload candidates if ready and initializing contexts still leave room in the pool + const poolOccupancy = this.contexts.size + this.initializingContexts.size; + if (candidates.length > 0 && (isUnlimited || poolOccupancy < maxContexts)) { this._preloadBackgroundContexts(candidates, isUnlimited ? 0 : maxContexts); } } @@ -2115,6 +2276,8 @@ class BrowserManager { throw new Error(`Invalid authIndex: ${authIndex}. Must be >= 0.`); } + this._cancelPendingContextClosure(authIndex, "context_reused"); + // [Auth Switch] Save current auth data before switching if (this.browser && this._currentAuthIndex >= 0 && this._currentAuthIndex !== authIndex) { try { @@ -2194,6 +2357,7 @@ class BrowserManager { // Switch to new context this._activateContext(contextData.context, contextData.page, authIndex); + await this._flushPendingContextClosures(); this.logger.info(`✅ [FastSwitch] Switched to account #${authIndex} instantly!`); return; @@ -2249,6 +2413,7 @@ class BrowserManager { const { context, page } = await this._initializeContext(authIndex, false); this._activateContext(context, page, authIndex); + await this._flushPendingContextClosures(); // If this account was marked as expired but login succeeded, restore it if (this.authSource.isExpired(authIndex)) { @@ -2456,6 +2621,8 @@ class BrowserManager { * @param {number} authIndex - The auth index to close */ async closeContext(authIndex) { + this.pendingContextClosures.delete(authIndex); + // If context is being initialized in background, signal abort and wait if (this.initializingContexts.has(authIndex)) { this.logger.info(`[Browser] Context #${authIndex} is being initialized, marking for abort and waiting...`); @@ -2549,6 +2716,7 @@ class BrowserManager { this.contexts.clear(); this.initializingContexts.clear(); this.abortedContexts.clear(); + this.pendingContextClosures.clear(); this._wsInitState.clear(); this.context = null; this.page = null; diff --git a/src/core/ConnectionRegistry.js b/src/core/ConnectionRegistry.js index 5be2107..73082be 100644 --- a/src/core/ConnectionRegistry.js +++ b/src/core/ConnectionRegistry.js @@ -426,6 +426,7 @@ class ConnectionRegistry extends EventEmitter { // This prevents stale queues from lingering when retrying failed requests const existingEntry = this.messageQueues.get(requestId); if (existingEntry) { + const existingAuthIndex = existingEntry.authIndex; this.logger.debug( `[Registry] Found existing message queue for request ${requestId} (authIndex=${existingEntry.authIndex}), closing it before creating new one` ); @@ -435,6 +436,7 @@ class ConnectionRegistry extends EventEmitter { this.logger.debug(`[Registry] Failed to close existing queue for ${requestId}: ${e.message}`); } this.messageQueues.delete(requestId); + this._emitAuthQueuesDrainedIfNeeded(existingAuthIndex); } const queue = new MessageQueue(); @@ -456,8 +458,10 @@ class ConnectionRegistry extends EventEmitter { removeMessageQueue(requestId, reason = "handler_cleanup") { const entry = this.messageQueues.get(requestId); if (entry) { + const authIndex = entry.authIndex; entry.queue.close(reason); this.messageQueues.delete(requestId); + this._emitAuthQueuesDrainedIfNeeded(authIndex); } } @@ -481,6 +485,21 @@ class ConnectionRegistry extends EventEmitter { return entry ? entry.requestAttemptId || null : null; } + /** + * Get the number of active message queues for a specific account + * @param {number} authIndex - The account index to inspect + * @returns {number} Number of active message queues + */ + getMessageQueueCountForAuth(authIndex) { + let count = 0; + for (const entry of this.messageQueues.values()) { + if (entry.authIndex === authIndex) { + count++; + } + } + return count; + } + /** * Close all message queues belonging to a specific account * @param {number} authIndex - The account whose queues should be closed @@ -505,6 +524,7 @@ class ConnectionRegistry extends EventEmitter { `[Registry] Force closed ${count} pending message queue(s) for account #${authIndex} (reason: ${reason})` ); } + this._emitAuthQueuesDrainedIfNeeded(authIndex); return count; } @@ -548,6 +568,7 @@ class ConnectionRegistry extends EventEmitter { this.logger.debug(`[Registry] Failed to close stale queue for ${requestId}: ${e.message}`); } this.messageQueues.delete(requestId); + this._emitAuthQueuesDrainedIfNeeded(entry.authIndex); cleanedCount++; } } @@ -559,6 +580,15 @@ class ConnectionRegistry extends EventEmitter { return cleanedCount; } + _emitAuthQueuesDrainedIfNeeded(authIndex) { + if (!Number.isInteger(authIndex) || authIndex < 0) { + return; + } + if (this.getMessageQueueCountForAuth(authIndex) === 0) { + this.emit("authQueuesDrained", authIndex); + } + } + /** * Safely close a WebSocket connection with readyState check * @param {WebSocket} ws - The WebSocket to close diff --git a/src/core/ProxyServerSystem.js b/src/core/ProxyServerSystem.js index 5699a3c..5a34241 100644 --- a/src/core/ProxyServerSystem.js +++ b/src/core/ProxyServerSystem.js @@ -111,6 +111,7 @@ class ProxyServerSystem extends EventEmitter { this.config, this.authSource ); + this.browserManager.setSystemBusyProvider(() => this.requestHandler?.isSystemBusy === true); this.httpServer = null; this.wsServer = null;