From 5bd0eaa0b889b16ccd459d5323d9effb21877818 Mon Sep 17 00:00:00 2001 From: Nekohy Date: Thu, 30 Apr 2026 15:29:59 +0800 Subject: [PATCH 1/3] =?UTF-8?q?fix:=E9=AB=98=E5=B9=B6=E5=8F=91=E4=B8=8B503?= =?UTF-8?q?=E7=AA=97=E5=8F=A3=E5=85=B3=E9=97=AD=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/core/BrowserManager.js | 162 +++++++++++++++++++++++++++++++-- src/core/ConnectionRegistry.js | 30 ++++++ 2 files changed, 184 insertions(+), 8 deletions(-) diff --git a/src/core/BrowserManager.js b/src/core/BrowserManager.js index b976c68..57dd1ed 100644 --- a/src/core/BrowserManager.js +++ b/src/core/BrowserManager.js @@ -55,6 +55,8 @@ class BrowserManager { // ConnectionRegistry reference (set after construction to avoid circular dependency) this.connectionRegistry = null; + this._onAuthQueuesDrained = null; + this.pendingContextClosures = new Map(); // Background wakeup service status (instance-level, tracks this.page) // Prevents multiple BackgroundWakeup instances from running simultaneously @@ -148,7 +150,20 @@ class BrowserManager { * @param {ConnectionRegistry} connectionRegistry - The ConnectionRegistry instance */ setConnectionRegistry(connectionRegistry) { + if (this.connectionRegistry && this._onAuthQueuesDrained) { + this.connectionRegistry.off("authQueuesDrained", this._onAuthQueuesDrained); + } this.connectionRegistry = connectionRegistry; + if (this.connectionRegistry) { + this._onAuthQueuesDrained = authIndex => { + this._closePendingContextIfIdle(authIndex).catch(error => { + this.logger.error( + `[ContextPool] Failed to close pending context #${authIndex} after queue drain: ${error.message}` + ); + }); + }; + this.connectionRegistry.on("authQueuesDrained", this._onAuthQueuesDrained); + } } /** @@ -1532,6 +1547,105 @@ class BrowserManager { }); } + _getActiveQueueCountForAuth(authIndex) { + if (!this.connectionRegistry?.getMessageQueueCountForAuth) { + return 0; + } + return this.connectionRegistry.getMessageQueueCountForAuth(authIndex); + } + + _cancelPendingContextClosure(authIndex, reason = "closure_cancelled") { + if (!this.pendingContextClosures.has(authIndex)) { + return false; + } + this.pendingContextClosures.delete(authIndex); + this.logger.info(`[ContextPool] Cancelled pending close for context #${authIndex} (${reason}).`); + return true; + } + + _scheduleContextClosureWhenIdle(authIndex, reason, activeQueueCount) { + if (!this.contexts.has(authIndex)) { + return false; + } + const existingReason = this.pendingContextClosures.get(authIndex); + if (existingReason) { + this.logger.debug( + `[ContextPool] Context #${authIndex} is already pending close (${existingReason}), active queues: ${activeQueueCount}` + ); + return false; + } + this.pendingContextClosures.set(authIndex, reason); + this.logger.info( + `[ContextPool] Deferring close for busy context #${authIndex} (${activeQueueCount} active queue(s), reason: ${reason})` + ); + return false; + } + + async _closeContextForPoolIfPossible(authIndex, reason) { + const activeQueueCount = this._getActiveQueueCountForAuth(authIndex); + if (activeQueueCount > 0) { + return this._scheduleContextClosureWhenIdle(authIndex, reason, activeQueueCount); + } + + this.pendingContextClosures.delete(authIndex); + await this.closeContext(authIndex); + return true; + } + + async _closePendingContextIfIdle(authIndex) { + if (!this.pendingContextClosures.has(authIndex)) { + return false; + } + if (authIndex === this._currentAuthIndex) { + this.logger.debug( + `[ContextPool] Skipping pending close for context #${authIndex} because it is active again as current.` + ); + return false; + } + if (!this.contexts.has(authIndex)) { + this.pendingContextClosures.delete(authIndex); + return false; + } + + const activeQueueCount = this._getActiveQueueCountForAuth(authIndex); + if (activeQueueCount > 0) { + this.logger.debug( + `[ContextPool] Pending close for context #${authIndex} is still waiting on ${activeQueueCount} active queue(s).` + ); + return false; + } + + const pendingReason = this.pendingContextClosures.get(authIndex); + this.pendingContextClosures.delete(authIndex); + this.logger.info( + `[ContextPool] Closing deferred context #${authIndex} now that all queues are drained (reason: ${pendingReason}).` + ); + await this.closeContext(authIndex); + return true; + } + + async _flushPendingContextClosures() { + for (const authIndex of [...this.pendingContextClosures.keys()]) { + await this._closePendingContextIfIdle(authIndex); + } + } + + _prioritizeContextsForRemoval(indices) { + const idle = []; + const busy = []; + + for (const authIndex of indices) { + const activeQueueCount = this._getActiveQueueCountForAuth(authIndex); + if (activeQueueCount > 0) { + busy.push({ activeQueueCount, authIndex }); + } else { + idle.push(authIndex); + } + } + + return { busy, idle }; + } + /** * Internal method to execute the actual preload task * @private @@ -1754,15 +1868,32 @@ class BrowserManager { } } - // Remove contexts according to priority until we have enough space - const toRemove = removalPriority.slice(0, removeCount); + const { busy, idle } = this._prioritizeContextsForRemoval(removalPriority); + const toRemoveNow = idle.slice(0, removeCount); + const toDefer = busy.slice(0, Math.max(0, removeCount - toRemoveNow.length)); this.logger.info( - `[ContextPool] Pre-cleanup: removing ${toRemove.length} contexts before switch to #${targetAuthIndex}: [${toRemove}] (${this.contexts.size} ready + ${this.initializingContexts.size} initializing)` + `[ContextPool] Pre-cleanup before switch to #${targetAuthIndex}: immediate=[${toRemoveNow}], deferred=[${toDefer.map( + entry => `${entry.authIndex}:${entry.activeQueueCount}` + )}] (${this.contexts.size} ready + ${this.initializingContexts.size} initializing)` ); - for (const idx of toRemove) { - await this.closeContext(idx); + for (const idx of toRemoveNow) { + await this._closeContextForPoolIfPossible(idx, "pre_cleanup_for_switch"); + } + + for (const entry of toDefer) { + this._scheduleContextClosureWhenIdle( + entry.authIndex, + "pre_cleanup_for_switch", + entry.activeQueueCount + ); + } + + if (toDefer.length > 0) { + this.logger.warn( + `[ContextPool] Allowing temporary MAX_CONTEXTS overflow while busy contexts drain before switch to #${targetAuthIndex}.` + ); } } @@ -1835,12 +1966,20 @@ class BrowserManager { // If a foreground task is running, _executePreloadTask will skip it (line 1382) const candidates = ordered.filter(idx => !activeContexts.has(idx)); + const { busy, idle } = this._prioritizeContextsForRemoval(toRemove); + this.logger.info( - `[ContextPool] Rebalance: targets=[${[...targets]}], remove=[${toRemove}], candidates=[${candidates}]` + `[ContextPool] Rebalance: targets=[${[...targets]}], removeNow=[${idle}], removeDeferred=[${busy.map( + entry => `${entry.authIndex}:${entry.activeQueueCount}` + )}], candidates=[${candidates}]` ); - for (const idx of toRemove) { - await this.closeContext(idx); + for (const idx of idle) { + await this._closeContextForPoolIfPossible(idx, "rebalance"); + } + + for (const entry of busy) { + this._scheduleContextClosureWhenIdle(entry.authIndex, "rebalance", entry.activeQueueCount); } // Preload candidates if we have room in the pool @@ -2115,6 +2254,8 @@ class BrowserManager { throw new Error(`Invalid authIndex: ${authIndex}. Must be >= 0.`); } + this._cancelPendingContextClosure(authIndex, "context_reused"); + // [Auth Switch] Save current auth data before switching if (this.browser && this._currentAuthIndex >= 0 && this._currentAuthIndex !== authIndex) { try { @@ -2194,6 +2335,7 @@ class BrowserManager { // Switch to new context this._activateContext(contextData.context, contextData.page, authIndex); + await this._flushPendingContextClosures(); this.logger.info(`✅ [FastSwitch] Switched to account #${authIndex} instantly!`); return; @@ -2249,6 +2391,7 @@ class BrowserManager { const { context, page } = await this._initializeContext(authIndex, false); this._activateContext(context, page, authIndex); + await this._flushPendingContextClosures(); // If this account was marked as expired but login succeeded, restore it if (this.authSource.isExpired(authIndex)) { @@ -2456,6 +2599,8 @@ class BrowserManager { * @param {number} authIndex - The auth index to close */ async closeContext(authIndex) { + this.pendingContextClosures.delete(authIndex); + // If context is being initialized in background, signal abort and wait if (this.initializingContexts.has(authIndex)) { this.logger.info(`[Browser] Context #${authIndex} is being initialized, marking for abort and waiting...`); @@ -2549,6 +2694,7 @@ class BrowserManager { this.contexts.clear(); this.initializingContexts.clear(); this.abortedContexts.clear(); + this.pendingContextClosures.clear(); this._wsInitState.clear(); this.context = null; this.page = null; diff --git a/src/core/ConnectionRegistry.js b/src/core/ConnectionRegistry.js index 5be2107..73082be 100644 --- a/src/core/ConnectionRegistry.js +++ b/src/core/ConnectionRegistry.js @@ -426,6 +426,7 @@ class ConnectionRegistry extends EventEmitter { // This prevents stale queues from lingering when retrying failed requests const existingEntry = this.messageQueues.get(requestId); if (existingEntry) { + const existingAuthIndex = existingEntry.authIndex; this.logger.debug( `[Registry] Found existing message queue for request ${requestId} (authIndex=${existingEntry.authIndex}), closing it before creating new one` ); @@ -435,6 +436,7 @@ class ConnectionRegistry extends EventEmitter { this.logger.debug(`[Registry] Failed to close existing queue for ${requestId}: ${e.message}`); } this.messageQueues.delete(requestId); + this._emitAuthQueuesDrainedIfNeeded(existingAuthIndex); } const queue = new MessageQueue(); @@ -456,8 +458,10 @@ class ConnectionRegistry extends EventEmitter { removeMessageQueue(requestId, reason = "handler_cleanup") { const entry = this.messageQueues.get(requestId); if (entry) { + const authIndex = entry.authIndex; entry.queue.close(reason); this.messageQueues.delete(requestId); + this._emitAuthQueuesDrainedIfNeeded(authIndex); } } @@ -481,6 +485,21 @@ class ConnectionRegistry extends EventEmitter { return entry ? entry.requestAttemptId || null : null; } + /** + * Get the number of active message queues for a specific account + * @param {number} authIndex - The account index to inspect + * @returns {number} Number of active message queues + */ + getMessageQueueCountForAuth(authIndex) { + let count = 0; + for (const entry of this.messageQueues.values()) { + if (entry.authIndex === authIndex) { + count++; + } + } + return count; + } + /** * Close all message queues belonging to a specific account * @param {number} authIndex - The account whose queues should be closed @@ -505,6 +524,7 @@ class ConnectionRegistry extends EventEmitter { `[Registry] Force closed ${count} pending message queue(s) for account #${authIndex} (reason: ${reason})` ); } + this._emitAuthQueuesDrainedIfNeeded(authIndex); return count; } @@ -548,6 +568,7 @@ class ConnectionRegistry extends EventEmitter { this.logger.debug(`[Registry] Failed to close stale queue for ${requestId}: ${e.message}`); } this.messageQueues.delete(requestId); + this._emitAuthQueuesDrainedIfNeeded(entry.authIndex); cleanedCount++; } } @@ -559,6 +580,15 @@ class ConnectionRegistry extends EventEmitter { return cleanedCount; } + _emitAuthQueuesDrainedIfNeeded(authIndex) { + if (!Number.isInteger(authIndex) || authIndex < 0) { + return; + } + if (this.getMessageQueueCountForAuth(authIndex) === 0) { + this.emit("authQueuesDrained", authIndex); + } + } + /** * Safely close a WebSocket connection with readyState check * @param {WebSocket} ws - The WebSocket to close From f974286c607a67f2a91a97f803f5c42451f97833 Mon Sep 17 00:00:00 2001 From: bbbugg Date: Tue, 12 May 2026 15:51:12 +0800 Subject: [PATCH 2/3] chore: add context pool rebalance handling and prevent premature context closures --- src/core/BrowserManager.js | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/core/BrowserManager.js b/src/core/BrowserManager.js index 57dd1ed..4fa11ad 100644 --- a/src/core/BrowserManager.js +++ b/src/core/BrowserManager.js @@ -1621,6 +1621,9 @@ class BrowserManager { `[ContextPool] Closing deferred context #${authIndex} now that all queues are drained (reason: ${pendingReason}).` ); await this.closeContext(authIndex); + this.rebalanceContextPool().catch(error => { + this.logger.error(`[ContextPool] Rebalance after deferred close failed: ${error.message}`); + }); return true; } @@ -1883,11 +1886,7 @@ class BrowserManager { } for (const entry of toDefer) { - this._scheduleContextClosureWhenIdle( - entry.authIndex, - "pre_cleanup_for_switch", - entry.activeQueueCount - ); + this._scheduleContextClosureWhenIdle(entry.authIndex, "pre_cleanup_for_switch", entry.activeQueueCount); } if (toDefer.length > 0) { @@ -1927,6 +1926,10 @@ class BrowserManager { targets = new Set(ordered.slice(0, maxContexts)); } + for (const idx of targets) { + this._cancelPendingContextClosure(idx, "rebalance_target"); + } + // Remove contexts not in targets (except current) // Special handling: if current account is a duplicate (old version), also remove its canonical version // BUT only in limited mode - in unlimited mode, keep all contexts From f70af01e2476480e54c7c58ed7e558677ee894d2 Mon Sep 17 00:00:00 2001 From: bbbugg Date: Tue, 12 May 2026 16:31:06 +0800 Subject: [PATCH 3/3] fix: add system busy state handling to prevent context pool rebalance during high load --- src/core/BrowserManager.js | 23 +++++++++++++++++++++-- src/core/ProxyServerSystem.js | 1 + 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/core/BrowserManager.js b/src/core/BrowserManager.js index 4fa11ad..9406c86 100644 --- a/src/core/BrowserManager.js +++ b/src/core/BrowserManager.js @@ -56,6 +56,7 @@ class BrowserManager { // ConnectionRegistry reference (set after construction to avoid circular dependency) this.connectionRegistry = null; this._onAuthQueuesDrained = null; + this._isSystemBusyProvider = null; this.pendingContextClosures = new Map(); // Background wakeup service status (instance-level, tracks this.page) @@ -166,6 +167,19 @@ class BrowserManager { } } + setSystemBusyProvider(provider) { + this._isSystemBusyProvider = typeof provider === "function" ? provider : null; + } + + _isSystemBusy() { + try { + return this._isSystemBusyProvider?.() === true; + } catch (error) { + this.logger.warn(`[ContextPool] Failed to read system busy state: ${error.message}`); + return false; + } + } + /** * Helper: Check for page errors that require refresh * @returns {Object} Object with error flags @@ -1621,6 +1635,10 @@ class BrowserManager { `[ContextPool] Closing deferred context #${authIndex} now that all queues are drained (reason: ${pendingReason}).` ); await this.closeContext(authIndex); + if (this._isSystemBusy()) { + this.logger.info("[ContextPool] Skipping rebalance after deferred close because system is busy."); + return true; + } this.rebalanceContextPool().catch(error => { this.logger.error(`[ContextPool] Rebalance after deferred close failed: ${error.message}`); }); @@ -1985,8 +2003,9 @@ class BrowserManager { this._scheduleContextClosureWhenIdle(entry.authIndex, "rebalance", entry.activeQueueCount); } - // Preload candidates if we have room in the pool - if (candidates.length > 0 && (isUnlimited || this.contexts.size < maxContexts)) { + // Preload candidates if ready and initializing contexts still leave room in the pool + const poolOccupancy = this.contexts.size + this.initializingContexts.size; + if (candidates.length > 0 && (isUnlimited || poolOccupancy < maxContexts)) { this._preloadBackgroundContexts(candidates, isUnlimited ? 0 : maxContexts); } } diff --git a/src/core/ProxyServerSystem.js b/src/core/ProxyServerSystem.js index 5699a3c..5a34241 100644 --- a/src/core/ProxyServerSystem.js +++ b/src/core/ProxyServerSystem.js @@ -111,6 +111,7 @@ class ProxyServerSystem extends EventEmitter { this.config, this.authSource ); + this.browserManager.setSystemBusyProvider(() => this.requestHandler?.isSystemBusy === true); this.httpServer = null; this.wsServer = null;