Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 175 additions & 10 deletions src/core/BrowserManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class BrowserManager {

// ConnectionRegistry reference (set after construction to avoid circular dependency)
this.connectionRegistry = null;
this._onAuthQueuesDrained = null;
this._isSystemBusyProvider = null;
this.pendingContextClosures = new Map();

// Background wakeup service status (instance-level, tracks this.page)
// Prevents multiple BackgroundWakeup instances from running simultaneously
Expand Down Expand Up @@ -148,7 +151,33 @@ class BrowserManager {
* @param {ConnectionRegistry} connectionRegistry - The ConnectionRegistry instance
*/
setConnectionRegistry(connectionRegistry) {
if (this.connectionRegistry && this._onAuthQueuesDrained) {
this.connectionRegistry.off("authQueuesDrained", this._onAuthQueuesDrained);
}
this.connectionRegistry = connectionRegistry;
if (this.connectionRegistry) {
this._onAuthQueuesDrained = authIndex => {
this._closePendingContextIfIdle(authIndex).catch(error => {
this.logger.error(
`[ContextPool] Failed to close pending context #${authIndex} after queue drain: ${error.message}`
);
});
};
this.connectionRegistry.on("authQueuesDrained", this._onAuthQueuesDrained);
}
}

setSystemBusyProvider(provider) {
this._isSystemBusyProvider = typeof provider === "function" ? provider : null;
}

_isSystemBusy() {
try {
return this._isSystemBusyProvider?.() === true;
} catch (error) {
this.logger.warn(`[ContextPool] Failed to read system busy state: ${error.message}`);
return false;
}
}

/**
Expand Down Expand Up @@ -1532,6 +1561,109 @@ class BrowserManager {
});
}

_hasActiveQueueForAuth(authIndex) {
if (this.connectionRegistry?.hasMessageQueueForAuth) {
return this.connectionRegistry.hasMessageQueueForAuth(authIndex);
}
return false;
}

_cancelPendingContextClosure(authIndex, reason = "closure_cancelled") {
if (!this.pendingContextClosures.has(authIndex)) {
return false;
}
this.pendingContextClosures.delete(authIndex);
this.logger.info(`[ContextPool] Cancelled pending close for context #${authIndex} (${reason}).`);
return true;
}

_scheduleContextClosureWhenIdle(authIndex, reason) {
if (!this.contexts.has(authIndex)) {
return false;
}
const existingReason = this.pendingContextClosures.get(authIndex);
if (existingReason) {
this.logger.debug(
`[ContextPool] Context #${authIndex} is already pending close (${existingReason}), active queues are still present`
);
return false;
}
this.pendingContextClosures.set(authIndex, reason);
this.logger.info(
`[ContextPool] Deferring close for busy context #${authIndex} (active queue(s) present, reason: ${reason})`
);
return false;
}

async _closeContextForPoolIfPossible(authIndex, reason) {
if (this._hasActiveQueueForAuth(authIndex)) {
return this._scheduleContextClosureWhenIdle(authIndex, reason);
}

this.pendingContextClosures.delete(authIndex);
await this.closeContext(authIndex);
return true;
}

async _closePendingContextIfIdle(authIndex) {
if (!this.pendingContextClosures.has(authIndex)) {
return false;
}
if (authIndex === this._currentAuthIndex) {
this.logger.debug(
`[ContextPool] Skipping pending close for context #${authIndex} because it is active again as current.`
);
return false;
}
if (!this.contexts.has(authIndex)) {
this.pendingContextClosures.delete(authIndex);
return false;
}

if (this._hasActiveQueueForAuth(authIndex)) {
this.logger.debug(
`[ContextPool] Pending close for context #${authIndex} is still waiting on active queue(s).`
);
return false;
}

const pendingReason = this.pendingContextClosures.get(authIndex);
this.pendingContextClosures.delete(authIndex);
this.logger.info(
`[ContextPool] Closing deferred context #${authIndex} now that all queues are drained (reason: ${pendingReason}).`
);
await this.closeContext(authIndex);
if (this._isSystemBusy()) {
this.logger.info("[ContextPool] Skipping rebalance after deferred close because system is busy.");
return true;
}
this.rebalanceContextPool().catch(error => {
this.logger.error(`[ContextPool] Rebalance after deferred close failed: ${error.message}`);
});
Comment thread
bbbugg marked this conversation as resolved.
return true;
}

async _flushPendingContextClosures() {
for (const authIndex of [...this.pendingContextClosures.keys()]) {
await this._closePendingContextIfIdle(authIndex);
Comment thread
bbbugg marked this conversation as resolved.
}
}

_prioritizeContextsForRemoval(indices) {
const idle = [];
const busy = [];

for (const authIndex of indices) {
if (this._hasActiveQueueForAuth(authIndex)) {
busy.push({ authIndex });
} else {
idle.push(authIndex);
}
}

return { busy, idle };
Comment thread
bbbugg marked this conversation as resolved.
}

/**
* Internal method to execute the actual preload task
* @private
Expand Down Expand Up @@ -1754,15 +1886,28 @@ class BrowserManager {
}
}

// Remove contexts according to priority until we have enough space
const toRemove = removalPriority.slice(0, removeCount);
const { busy, idle } = this._prioritizeContextsForRemoval(removalPriority);
const toRemoveNow = idle.slice(0, removeCount);
const toDefer = busy.slice(0, Math.max(0, removeCount - toRemoveNow.length));

this.logger.info(
`[ContextPool] Pre-cleanup: removing ${toRemove.length} contexts before switch to #${targetAuthIndex}: [${toRemove}] (${this.contexts.size} ready + ${this.initializingContexts.size} initializing)`
`[ContextPool] Pre-cleanup before switch to #${targetAuthIndex}: immediate=[${toRemoveNow}], deferred=[${toDefer.map(
entry => entry.authIndex
)}] (${this.contexts.size} ready + ${this.initializingContexts.size} initializing)`
);

for (const idx of toRemove) {
await this.closeContext(idx);
for (const idx of toRemoveNow) {
await this._closeContextForPoolIfPossible(idx, "pre_cleanup_for_switch");
}

for (const entry of toDefer) {
this._scheduleContextClosureWhenIdle(entry.authIndex, "pre_cleanup_for_switch");
}

if (toDefer.length > 0) {
this.logger.warn(
`[ContextPool] Allowing temporary MAX_CONTEXTS overflow while busy contexts drain before switch to #${targetAuthIndex}.`
);
}
Comment thread
bbbugg marked this conversation as resolved.
}

Expand Down Expand Up @@ -1796,6 +1941,10 @@ class BrowserManager {
targets = new Set(ordered.slice(0, maxContexts));
}

for (const idx of targets) {
this._cancelPendingContextClosure(idx, "rebalance_target");
}

// Remove contexts not in targets (except current)
// Special handling: if current account is a duplicate (old version), also remove its canonical version
// BUT only in limited mode - in unlimited mode, keep all contexts
Expand Down Expand Up @@ -1835,16 +1984,25 @@ class BrowserManager {
// If a foreground task is running, _executePreloadTask will skip it (line 1382)
const candidates = ordered.filter(idx => !activeContexts.has(idx));

const { busy, idle } = this._prioritizeContextsForRemoval(toRemove);

this.logger.info(
`[ContextPool] Rebalance: targets=[${[...targets]}], remove=[${toRemove}], candidates=[${candidates}]`
`[ContextPool] Rebalance: targets=[${[...targets]}], removeNow=[${idle}], removeDeferred=[${busy.map(
entry => entry.authIndex
)}], candidates=[${candidates}]`
);

for (const idx of toRemove) {
await this.closeContext(idx);
for (const idx of idle) {
await this._closeContextForPoolIfPossible(idx, "rebalance");
}

for (const entry of busy) {
this._scheduleContextClosureWhenIdle(entry.authIndex, "rebalance");
}

// Preload candidates if we have room in the pool
if (candidates.length > 0 && (isUnlimited || this.contexts.size < maxContexts)) {
// Preload candidates if ready and initializing contexts still leave room in the pool
const poolOccupancy = this.contexts.size + this.initializingContexts.size;
if (candidates.length > 0 && (isUnlimited || poolOccupancy < maxContexts)) {
this._preloadBackgroundContexts(candidates, isUnlimited ? 0 : maxContexts);
}
}
Expand Down Expand Up @@ -2115,6 +2273,8 @@ class BrowserManager {
throw new Error(`Invalid authIndex: ${authIndex}. Must be >= 0.`);
}

this._cancelPendingContextClosure(authIndex, "context_reused");

// [Auth Switch] Save current auth data before switching
if (this.browser && this._currentAuthIndex >= 0 && this._currentAuthIndex !== authIndex) {
try {
Expand Down Expand Up @@ -2194,6 +2354,7 @@ class BrowserManager {

// Switch to new context
this._activateContext(contextData.context, contextData.page, authIndex);
await this._flushPendingContextClosures();
Comment thread
bbbugg marked this conversation as resolved.
Comment thread
bbbugg marked this conversation as resolved.

this.logger.info(`✅ [FastSwitch] Switched to account #${authIndex} instantly!`);
return;
Expand Down Expand Up @@ -2249,6 +2410,7 @@ class BrowserManager {
const { context, page } = await this._initializeContext(authIndex, false);

this._activateContext(context, page, authIndex);
await this._flushPendingContextClosures();
Comment thread
bbbugg marked this conversation as resolved.

// If this account was marked as expired but login succeeded, restore it
if (this.authSource.isExpired(authIndex)) {
Expand Down Expand Up @@ -2456,6 +2618,8 @@ class BrowserManager {
* @param {number} authIndex - The auth index to close
*/
async closeContext(authIndex) {
this.pendingContextClosures.delete(authIndex);

// If context is being initialized in background, signal abort and wait
if (this.initializingContexts.has(authIndex)) {
this.logger.info(`[Browser] Context #${authIndex} is being initialized, marking for abort and waiting...`);
Expand Down Expand Up @@ -2549,6 +2713,7 @@ class BrowserManager {
this.contexts.clear();
this.initializingContexts.clear();
this.abortedContexts.clear();
this.pendingContextClosures.clear();
this._wsInitState.clear();
this.context = null;
this.page = null;
Expand Down
39 changes: 38 additions & 1 deletion src/core/ConnectionRegistry.js
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,19 @@ class ConnectionRegistry extends EventEmitter {

getConnectionByAuth(authIndex, log = true) {
const connection = this.connectionsByAuth.get(authIndex);
if (connection && log) {

if (!log) {
return connection;
}

if (connection) {
this.logger.debug(`[Registry] Found WebSocket connection for authIndex=${authIndex}`);
} else if (this.logger.getLevel?.() === "DEBUG") {
this.logger.debug(
`[Registry] No WebSocket connection found for authIndex=${authIndex}. Available: [${Array.from(this.connectionsByAuth.keys()).join(", ")}]`
);
}

return connection;
}

Expand Down Expand Up @@ -426,6 +432,7 @@ class ConnectionRegistry extends EventEmitter {
// This prevents stale queues from lingering when retrying failed requests
const existingEntry = this.messageQueues.get(requestId);
if (existingEntry) {
const existingAuthIndex = existingEntry.authIndex;
this.logger.debug(
`[Registry] Found existing message queue for request ${requestId} (authIndex=${existingEntry.authIndex}), closing it before creating new one`
);
Expand All @@ -435,6 +442,9 @@ class ConnectionRegistry extends EventEmitter {
this.logger.debug(`[Registry] Failed to close existing queue for ${requestId}: ${e.message}`);
}
this.messageQueues.delete(requestId);
if (existingAuthIndex !== authIndex) {
this._emitAuthQueuesDrainedIfNeeded(existingAuthIndex);
}
}

const queue = new MessageQueue();
Expand All @@ -456,8 +466,10 @@ class ConnectionRegistry extends EventEmitter {
removeMessageQueue(requestId, reason = "handler_cleanup") {
const entry = this.messageQueues.get(requestId);
if (entry) {
const authIndex = entry.authIndex;
entry.queue.close(reason);
this.messageQueues.delete(requestId);
this._emitAuthQueuesDrainedIfNeeded(authIndex);
}
}

Expand All @@ -481,6 +493,20 @@ class ConnectionRegistry extends EventEmitter {
return entry ? entry.requestAttemptId || null : null;
}

/**
* Check whether a specific account has any active message queue.
* @param {number} authIndex - The account index to inspect
* @returns {boolean} True if at least one active message queue belongs to the account
*/
hasMessageQueueForAuth(authIndex) {
for (const entry of this.messageQueues.values()) {
if (entry.authIndex === authIndex) {
return true;
}
}
return false;
Comment thread
bbbugg marked this conversation as resolved.
}
Comment thread
bbbugg marked this conversation as resolved.
Comment thread
bbbugg marked this conversation as resolved.
Comment thread
bbbugg marked this conversation as resolved.
Comment thread
bbbugg marked this conversation as resolved.
Comment thread
bbbugg marked this conversation as resolved.
Comment thread
bbbugg marked this conversation as resolved.

/**
* Close all message queues belonging to a specific account
* @param {number} authIndex - The account whose queues should be closed
Expand All @@ -504,6 +530,7 @@ class ConnectionRegistry extends EventEmitter {
this.logger.info(
`[Registry] Force closed ${count} pending message queue(s) for account #${authIndex} (reason: ${reason})`
);
this._emitAuthQueuesDrainedIfNeeded(authIndex);
}
return count;
}
Expand Down Expand Up @@ -548,6 +575,7 @@ class ConnectionRegistry extends EventEmitter {
this.logger.debug(`[Registry] Failed to close stale queue for ${requestId}: ${e.message}`);
}
this.messageQueues.delete(requestId);
this._emitAuthQueuesDrainedIfNeeded(entry.authIndex);
cleanedCount++;
}
}
Expand All @@ -559,6 +587,15 @@ class ConnectionRegistry extends EventEmitter {
return cleanedCount;
}

_emitAuthQueuesDrainedIfNeeded(authIndex) {
if (!Number.isInteger(authIndex) || authIndex < 0) {
return;
}
if (!this.hasMessageQueueForAuth(authIndex)) {
this.emit("authQueuesDrained", authIndex);
}
}
Comment thread
bbbugg marked this conversation as resolved.

/**
* Safely close a WebSocket connection with readyState check
* @param {WebSocket} ws - The WebSocket to close
Expand Down
1 change: 1 addition & 0 deletions src/core/ProxyServerSystem.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class ProxyServerSystem extends EventEmitter {
this.config,
this.authSource
);
this.browserManager.setSystemBusyProvider(() => this.requestHandler?.isSystemBusy === true);

this.httpServer = null;
this.wsServer = null;
Expand Down
Loading
Loading