Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 178 additions & 10 deletions src/core/BrowserManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class BrowserManager {

// ConnectionRegistry reference (set after construction to avoid circular dependency)
this.connectionRegistry = null;
this._onAuthQueuesDrained = null;
this._isSystemBusyProvider = null;
this.pendingContextClosures = new Map();

// Background wakeup service status (instance-level, tracks this.page)
// Prevents multiple BackgroundWakeup instances from running simultaneously
Expand Down Expand Up @@ -148,7 +151,33 @@ class BrowserManager {
* @param {ConnectionRegistry} connectionRegistry - The ConnectionRegistry instance
*/
setConnectionRegistry(connectionRegistry) {
if (this.connectionRegistry && this._onAuthQueuesDrained) {
this.connectionRegistry.off("authQueuesDrained", this._onAuthQueuesDrained);
}
this.connectionRegistry = connectionRegistry;
if (this.connectionRegistry) {
this._onAuthQueuesDrained = authIndex => {
this._closePendingContextIfIdle(authIndex).catch(error => {
this.logger.error(
`[ContextPool] Failed to close pending context #${authIndex} after queue drain: ${error.message}`
);
});
};
this.connectionRegistry.on("authQueuesDrained", this._onAuthQueuesDrained);
}
}

setSystemBusyProvider(provider) {
this._isSystemBusyProvider = typeof provider === "function" ? provider : null;
}

_isSystemBusy() {
try {
return this._isSystemBusyProvider?.() === true;
} catch (error) {
this.logger.warn(`[ContextPool] Failed to read system busy state: ${error.message}`);
return false;
}
}

/**
Expand Down Expand Up @@ -1532,6 +1561,112 @@ class BrowserManager {
});
}

_getActiveQueueCountForAuth(authIndex) {
if (!this.connectionRegistry?.getMessageQueueCountForAuth) {
return 0;
}
return this.connectionRegistry.getMessageQueueCountForAuth(authIndex);
}

_cancelPendingContextClosure(authIndex, reason = "closure_cancelled") {
if (!this.pendingContextClosures.has(authIndex)) {
return false;
}
this.pendingContextClosures.delete(authIndex);
this.logger.info(`[ContextPool] Cancelled pending close for context #${authIndex} (${reason}).`);
return true;
}

_scheduleContextClosureWhenIdle(authIndex, reason, activeQueueCount) {
if (!this.contexts.has(authIndex)) {
return false;
}
const existingReason = this.pendingContextClosures.get(authIndex);
if (existingReason) {
this.logger.debug(
`[ContextPool] Context #${authIndex} is already pending close (${existingReason}), active queues: ${activeQueueCount}`
);
return false;
}
this.pendingContextClosures.set(authIndex, reason);
this.logger.info(
`[ContextPool] Deferring close for busy context #${authIndex} (${activeQueueCount} active queue(s), reason: ${reason})`
);
return false;
}

async _closeContextForPoolIfPossible(authIndex, reason) {
const activeQueueCount = this._getActiveQueueCountForAuth(authIndex);
if (activeQueueCount > 0) {
return this._scheduleContextClosureWhenIdle(authIndex, reason, activeQueueCount);
}

this.pendingContextClosures.delete(authIndex);
await this.closeContext(authIndex);
return true;
}

async _closePendingContextIfIdle(authIndex) {
if (!this.pendingContextClosures.has(authIndex)) {
return false;
}
if (authIndex === this._currentAuthIndex) {
this.logger.debug(
`[ContextPool] Skipping pending close for context #${authIndex} because it is active again as current.`
);
return false;
}
if (!this.contexts.has(authIndex)) {
this.pendingContextClosures.delete(authIndex);
return false;
}

const activeQueueCount = this._getActiveQueueCountForAuth(authIndex);
if (activeQueueCount > 0) {
this.logger.debug(
`[ContextPool] Pending close for context #${authIndex} is still waiting on ${activeQueueCount} active queue(s).`
);
return false;
}

const pendingReason = this.pendingContextClosures.get(authIndex);
this.pendingContextClosures.delete(authIndex);
this.logger.info(
`[ContextPool] Closing deferred context #${authIndex} now that all queues are drained (reason: ${pendingReason}).`
);
await this.closeContext(authIndex);
if (this._isSystemBusy()) {
this.logger.info("[ContextPool] Skipping rebalance after deferred close because system is busy.");
return true;
}
this.rebalanceContextPool().catch(error => {
this.logger.error(`[ContextPool] Rebalance after deferred close failed: ${error.message}`);
});
return true;
}

async _flushPendingContextClosures() {
for (const authIndex of [...this.pendingContextClosures.keys()]) {
await this._closePendingContextIfIdle(authIndex);
}
}

_prioritizeContextsForRemoval(indices) {
const idle = [];
const busy = [];

for (const authIndex of indices) {
const activeQueueCount = this._getActiveQueueCountForAuth(authIndex);
if (activeQueueCount > 0) {
busy.push({ activeQueueCount, authIndex });
} else {
idle.push(authIndex);
}
}

Comment thread
bbbugg marked this conversation as resolved.
return { busy, idle };
}

/**
* Internal method to execute the actual preload task
* @private
Expand Down Expand Up @@ -1754,15 +1889,28 @@ class BrowserManager {
}
}

// Remove contexts according to priority until we have enough space
const toRemove = removalPriority.slice(0, removeCount);
const { busy, idle } = this._prioritizeContextsForRemoval(removalPriority);
const toRemoveNow = idle.slice(0, removeCount);
const toDefer = busy.slice(0, Math.max(0, removeCount - toRemoveNow.length));

this.logger.info(
`[ContextPool] Pre-cleanup: removing ${toRemove.length} contexts before switch to #${targetAuthIndex}: [${toRemove}] (${this.contexts.size} ready + ${this.initializingContexts.size} initializing)`
`[ContextPool] Pre-cleanup before switch to #${targetAuthIndex}: immediate=[${toRemoveNow}], deferred=[${toDefer.map(
entry => `${entry.authIndex}:${entry.activeQueueCount}`
)}] (${this.contexts.size} ready + ${this.initializingContexts.size} initializing)`
);

for (const idx of toRemove) {
await this.closeContext(idx);
for (const idx of toRemoveNow) {
await this._closeContextForPoolIfPossible(idx, "pre_cleanup_for_switch");
}

for (const entry of toDefer) {
this._scheduleContextClosureWhenIdle(entry.authIndex, "pre_cleanup_for_switch", entry.activeQueueCount);
}

if (toDefer.length > 0) {
this.logger.warn(
`[ContextPool] Allowing temporary MAX_CONTEXTS overflow while busy contexts drain before switch to #${targetAuthIndex}.`
);
}
}

Expand Down Expand Up @@ -1796,6 +1944,10 @@ class BrowserManager {
targets = new Set(ordered.slice(0, maxContexts));
}

for (const idx of targets) {
this._cancelPendingContextClosure(idx, "rebalance_target");
}

// Remove contexts not in targets (except current)
// Special handling: if current account is a duplicate (old version), also remove its canonical version
// BUT only in limited mode - in unlimited mode, keep all contexts
Expand Down Expand Up @@ -1835,16 +1987,25 @@ class BrowserManager {
// If a foreground task is running, _executePreloadTask will skip it (line 1382)
const candidates = ordered.filter(idx => !activeContexts.has(idx));

const { busy, idle } = this._prioritizeContextsForRemoval(toRemove);

this.logger.info(
`[ContextPool] Rebalance: targets=[${[...targets]}], remove=[${toRemove}], candidates=[${candidates}]`
`[ContextPool] Rebalance: targets=[${[...targets]}], removeNow=[${idle}], removeDeferred=[${busy.map(
entry => `${entry.authIndex}:${entry.activeQueueCount}`
)}], candidates=[${candidates}]`
);

for (const idx of toRemove) {
await this.closeContext(idx);
for (const idx of idle) {
await this._closeContextForPoolIfPossible(idx, "rebalance");
}

for (const entry of busy) {
this._scheduleContextClosureWhenIdle(entry.authIndex, "rebalance", entry.activeQueueCount);
}

// Preload candidates if we have room in the pool
if (candidates.length > 0 && (isUnlimited || this.contexts.size < maxContexts)) {
// Preload candidates if ready and initializing contexts still leave room in the pool
const poolOccupancy = this.contexts.size + this.initializingContexts.size;
if (candidates.length > 0 && (isUnlimited || poolOccupancy < maxContexts)) {
this._preloadBackgroundContexts(candidates, isUnlimited ? 0 : maxContexts);
}
}
Expand Down Expand Up @@ -2115,6 +2276,8 @@ class BrowserManager {
throw new Error(`Invalid authIndex: ${authIndex}. Must be >= 0.`);
}

this._cancelPendingContextClosure(authIndex, "context_reused");

// [Auth Switch] Save current auth data before switching
if (this.browser && this._currentAuthIndex >= 0 && this._currentAuthIndex !== authIndex) {
try {
Expand Down Expand Up @@ -2194,6 +2357,7 @@ class BrowserManager {

// Switch to new context
this._activateContext(contextData.context, contextData.page, authIndex);
await this._flushPendingContextClosures();

this.logger.info(`✅ [FastSwitch] Switched to account #${authIndex} instantly!`);
return;
Expand Down Expand Up @@ -2249,6 +2413,7 @@ class BrowserManager {
const { context, page } = await this._initializeContext(authIndex, false);

this._activateContext(context, page, authIndex);
await this._flushPendingContextClosures();

// If this account was marked as expired but login succeeded, restore it
if (this.authSource.isExpired(authIndex)) {
Expand Down Expand Up @@ -2456,6 +2621,8 @@ class BrowserManager {
* @param {number} authIndex - The auth index to close
*/
async closeContext(authIndex) {
this.pendingContextClosures.delete(authIndex);

// If context is being initialized in background, signal abort and wait
if (this.initializingContexts.has(authIndex)) {
this.logger.info(`[Browser] Context #${authIndex} is being initialized, marking for abort and waiting...`);
Expand Down Expand Up @@ -2549,6 +2716,7 @@ class BrowserManager {
this.contexts.clear();
this.initializingContexts.clear();
this.abortedContexts.clear();
this.pendingContextClosures.clear();
this._wsInitState.clear();
this.context = null;
this.page = null;
Expand Down
30 changes: 30 additions & 0 deletions src/core/ConnectionRegistry.js
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ class ConnectionRegistry extends EventEmitter {
// This prevents stale queues from lingering when retrying failed requests
const existingEntry = this.messageQueues.get(requestId);
if (existingEntry) {
const existingAuthIndex = existingEntry.authIndex;
this.logger.debug(
`[Registry] Found existing message queue for request ${requestId} (authIndex=${existingEntry.authIndex}), closing it before creating new one`
);
Expand All @@ -435,6 +436,7 @@ class ConnectionRegistry extends EventEmitter {
this.logger.debug(`[Registry] Failed to close existing queue for ${requestId}: ${e.message}`);
}
this.messageQueues.delete(requestId);
this._emitAuthQueuesDrainedIfNeeded(existingAuthIndex);
Comment thread
bbbugg marked this conversation as resolved.
}

const queue = new MessageQueue();
Expand All @@ -456,8 +458,10 @@ class ConnectionRegistry extends EventEmitter {
removeMessageQueue(requestId, reason = "handler_cleanup") {
const entry = this.messageQueues.get(requestId);
if (entry) {
const authIndex = entry.authIndex;
entry.queue.close(reason);
this.messageQueues.delete(requestId);
this._emitAuthQueuesDrainedIfNeeded(authIndex);
}
}

Expand All @@ -481,6 +485,21 @@ class ConnectionRegistry extends EventEmitter {
return entry ? entry.requestAttemptId || null : null;
}

/**
* Get the number of active message queues for a specific account
* @param {number} authIndex - The account index to inspect
* @returns {number} Number of active message queues
*/
getMessageQueueCountForAuth(authIndex) {
let count = 0;
for (const entry of this.messageQueues.values()) {
if (entry.authIndex === authIndex) {
count++;
}
}
return count;
Comment thread
bbbugg marked this conversation as resolved.
}

/**
* Close all message queues belonging to a specific account
* @param {number} authIndex - The account whose queues should be closed
Expand All @@ -505,6 +524,7 @@ class ConnectionRegistry extends EventEmitter {
`[Registry] Force closed ${count} pending message queue(s) for account #${authIndex} (reason: ${reason})`
);
}
this._emitAuthQueuesDrainedIfNeeded(authIndex);
return count;
}

Expand Down Expand Up @@ -548,6 +568,7 @@ class ConnectionRegistry extends EventEmitter {
this.logger.debug(`[Registry] Failed to close stale queue for ${requestId}: ${e.message}`);
}
this.messageQueues.delete(requestId);
this._emitAuthQueuesDrainedIfNeeded(entry.authIndex);
cleanedCount++;
}
}
Expand All @@ -559,6 +580,15 @@ class ConnectionRegistry extends EventEmitter {
return cleanedCount;
}

_emitAuthQueuesDrainedIfNeeded(authIndex) {
if (!Number.isInteger(authIndex) || authIndex < 0) {
return;
}
if (this.getMessageQueueCountForAuth(authIndex) === 0) {
this.emit("authQueuesDrained", authIndex);
}
}
Comment thread
bbbugg marked this conversation as resolved.

/**
* Safely close a WebSocket connection with readyState check
* @param {WebSocket} ws - The WebSocket to close
Expand Down
1 change: 1 addition & 0 deletions src/core/ProxyServerSystem.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class ProxyServerSystem extends EventEmitter {
this.config,
this.authSource
);
this.browserManager.setSystemBusyProvider(() => this.requestHandler?.isSystemBusy === true);

this.httpServer = null;
this.wsServer = null;
Expand Down
Loading