diff --git a/lib/codex-manager/settings-hub.ts b/lib/codex-manager/settings-hub.ts index fc999303..e50a06d9 100644 --- a/lib/codex-manager/settings-hub.ts +++ b/lib/codex-manager/settings-hub.ts @@ -186,6 +186,7 @@ type ThemeConfigAction = type BackendToggleSettingKey = | "liveAccountSync" + | "codexCliSessionSupervisor" | "sessionAffinity" | "proactiveRefreshGuardian" | "retryAllAccountsRateLimited" @@ -272,6 +273,7 @@ type SettingsHubAction = | { type: "back" }; type ExperimentalSettingsAction = + | { type: "toggle-session-supervisor" } | { type: "sync" } | { type: "backup" } | { type: "toggle-refresh-guardian" } @@ -300,9 +302,10 @@ function getExperimentalSelectOptions( function mapExperimentalMenuHotkey( raw: string, ): ExperimentalSettingsAction | undefined { - if (raw === "1") return { type: "sync" }; - if (raw === "2") return { type: "backup" }; - if (raw === "3") return { type: "toggle-refresh-guardian" }; + if (raw === "1") return { type: "toggle-session-supervisor" }; + if (raw === "2") return { type: "sync" }; + if (raw === "3") return { type: "backup" }; + if (raw === "4") return { type: "toggle-refresh-guardian" }; if (raw === "[" || raw === "-") return { type: "decrease-refresh-interval" }; if (raw === "]" || raw === "+") return { type: "increase-refresh-interval" }; const lower = raw.toLowerCase(); @@ -323,6 +326,11 @@ const BACKEND_TOGGLE_OPTIONS: BackendToggleSettingOption[] = [ label: "Enable Live Sync", description: "Keep accounts synced when files change in another window.", }, + { + key: "codexCliSessionSupervisor", + label: "Enable Session Resume Supervisor", + description: "Wrap interactive Codex sessions so they can relaunch with resume after rotation.", + }, { key: "sessionAffinity", label: "Enable Session Affinity", @@ -2568,6 +2576,11 @@ async function promptExperimentalSettings( while (true) { const action = await select( [ + { + label: `${formatDashboardSettingState(draft.codexCliSessionSupervisor ?? BACKEND_DEFAULTS.codexCliSessionSupervisor ?? false)} ${UI_COPY.settings.experimentalSessionSupervisor}`, + value: { type: "toggle-session-supervisor" }, + color: "yellow", + }, { label: UI_COPY.settings.experimentalSync, value: { type: "sync" }, @@ -2619,6 +2632,17 @@ async function promptExperimentalSettings( ); if (!action || action.type === "back") return null; if (action.type === "save") return draft; + if (action.type === "toggle-session-supervisor") { + draft = { + ...draft, + codexCliSessionSupervisor: !( + draft.codexCliSessionSupervisor ?? + BACKEND_DEFAULTS.codexCliSessionSupervisor ?? + false + ), + }; + continue; + } if (action.type === "toggle-refresh-guardian") { draft = { ...draft, diff --git a/lib/config.ts b/lib/config.ts index f9e7ecf8..4d63bd8e 100644 --- a/lib/config.ts +++ b/lib/config.ts @@ -145,6 +145,7 @@ export const DEFAULT_PLUGIN_CONFIG: PluginConfig = { liveAccountSync: true, liveAccountSyncDebounceMs: 250, liveAccountSyncPollMs: 2_000, + codexCliSessionSupervisor: false, sessionAffinity: true, sessionAffinityTtlMs: 20 * 60_000, sessionAffinityMaxEntries: 512, @@ -857,6 +858,25 @@ export function getLiveAccountSyncPollMs(pluginConfig: PluginConfig): number { ); } +/** + * Determines whether the CLI session supervisor wrapper is enabled. + * + * This accessor is synchronous, side-effect free, and safe for concurrent reads. + * It performs no filesystem I/O and does not expose token material. + * + * @param pluginConfig - The plugin configuration object used as the non-environment fallback + * @returns `true` when the session supervisor should wrap interactive Codex sessions + */ +export function getCodexCliSessionSupervisor( + pluginConfig: PluginConfig, +): boolean { + return resolveBooleanSetting( + "CODEX_AUTH_CLI_SESSION_SUPERVISOR", + pluginConfig.codexCliSessionSupervisor, + false, + ); +} + /** * Indicates whether session affinity is enabled. * diff --git a/lib/quota-probe.ts b/lib/quota-probe.ts index 9535b7f9..b93675a7 100644 --- a/lib/quota-probe.ts +++ b/lib/quota-probe.ts @@ -305,6 +305,13 @@ export interface ProbeCodexQuotaOptions { model?: string; fallbackModels?: readonly string[]; timeoutMs?: number; + signal?: AbortSignal; +} + +function createAbortError(message: string): Error { + const error = new Error(message); + error.name = "AbortError"; + return error; } /** @@ -331,6 +338,9 @@ export async function fetchCodexQuotaSnapshot( let lastError: Error | null = null; for (const model of models) { + if (options.signal?.aborted) { + throw createAbortError("Quota probe aborted"); + } try { const instructions = await getCodexInstructions(model); const probeBody: RequestBody = { @@ -356,6 +366,12 @@ export async function fetchCodexQuotaSnapshot( headers.set("content-type", "application/json"); const controller = new AbortController(); + const onAbort = () => controller.abort(options.signal?.reason); + if (options.signal?.aborted) { + controller.abort(options.signal.reason); + } else { + options.signal?.addEventListener("abort", onAbort, { once: true }); + } const timeout = setTimeout(() => controller.abort(), timeoutMs); let response: Response; try { @@ -367,6 +383,7 @@ export async function fetchCodexQuotaSnapshot( }); } finally { clearTimeout(timeout); + options.signal?.removeEventListener("abort", onAbort); } const snapshotBase = parseQuotaSnapshotBase(response.headers, response.status); @@ -406,9 +423,16 @@ export async function fetchCodexQuotaSnapshot( } lastError = new Error("Codex response did not include quota headers"); } catch (error) { + if (options.signal?.aborted) { + throw error instanceof Error ? error : createAbortError("Quota probe aborted"); + } lastError = error instanceof Error ? error : new Error(String(error)); } } + if (options.signal?.aborted) { + throw createAbortError("Quota probe aborted"); + } + throw lastError ?? new Error("Failed to fetch quotas"); } diff --git a/lib/schemas.ts b/lib/schemas.ts index 1ab18caa..452e39e1 100644 --- a/lib/schemas.ts +++ b/lib/schemas.ts @@ -44,6 +44,7 @@ export const PluginConfigSchema = z.object({ liveAccountSync: z.boolean().optional(), liveAccountSyncDebounceMs: z.number().min(50).optional(), liveAccountSyncPollMs: z.number().min(500).optional(), + codexCliSessionSupervisor: z.boolean().optional(), sessionAffinity: z.boolean().optional(), sessionAffinityTtlMs: z.number().min(1_000).optional(), sessionAffinityMaxEntries: z.number().min(8).optional(), diff --git a/lib/ui/copy.ts b/lib/ui/copy.ts index b40f7d8f..81256893 100644 --- a/lib/ui/copy.ts +++ b/lib/ui/copy.ts @@ -95,9 +95,10 @@ export const UI_COPY = { experimentalTitle: "Experimental", experimentalSubtitle: "Preview sync and backup actions before they become stable", experimentalHelpMenu: - "Enter Select | 1 Sync | 2 Backup | 3 Guard | [ - Down | ] + Up | S Save | Q Back", + "Enter Select | 1 Supervisor | 2 Sync | 3 Backup | 4 Guard | [ - Down | ] + Up | S Save | Q Back", experimentalHelpPreview: "Enter Select | A Apply | Q Back", experimentalHelpStatus: "Enter Select | Q Back", + experimentalSessionSupervisor: "Enable Session Resume Supervisor", experimentalSync: "Sync Accounts to oc-chatgpt-multi-auth", experimentalApplySync: "Apply Sync", experimentalBackup: "Save Pool Backup", diff --git a/package.json b/package.json index e4bae7b8..bee82a7a 100644 --- a/package.json +++ b/package.json @@ -51,6 +51,7 @@ "test:model-matrix": "node scripts/test-model-matrix.js", "test:model-matrix:smoke": "node scripts/test-model-matrix.js --smoke", "test:model-matrix:report": "node scripts/test-model-matrix.js --smoke --report-json=.tmp/model-matrix-report.json", + "test:session-supervisor:smoke": "vitest run test/codex-supervisor.test.ts test/codex-bin-wrapper.test.ts test/plugin-config.test.ts test/quota-probe.test.ts test/settings-hub-utils.test.ts", "clean:repo": "node scripts/repo-hygiene.js clean --mode aggressive", "clean:repo:check": "node scripts/repo-hygiene.js check", "bench:edit-formats": "node scripts/benchmark-edit-formats.mjs --preset=codex-core", diff --git a/scripts/codex-routing.js b/scripts/codex-routing.js index 297bf78d..1068a056 100644 --- a/scripts/codex-routing.js +++ b/scripts/codex-routing.js @@ -12,6 +12,8 @@ const AUTH_SUBCOMMANDS = new Set([ "fix", "doctor", ]); +const COMMAND_FLAGS_WITH_VALUE = new Set(["-c", "--config"]); +const HELP_OR_VERSION_FLAGS = new Set(["--help", "-h", "--version"]); export function normalizeAuthAlias(args) { if (args.length >= 2 && args[0] === "multi" && args[1] === "auth") { @@ -32,4 +34,93 @@ export function shouldHandleMultiAuthAuth(args) { return AUTH_SUBCOMMANDS.has(subcommand); } +export function findPrimaryCodexCommand(args) { + let expectFlagValue = false; + let stopOptionParsing = false; + + for (let index = 0; index < args.length; index += 1) { + const normalizedArg = `${args[index] ?? ""}`.trim().toLowerCase(); + if (normalizedArg.length === 0) { + continue; + } + if (expectFlagValue) { + expectFlagValue = false; + continue; + } + if (!stopOptionParsing && normalizedArg === "--") { + stopOptionParsing = true; + continue; + } + if (!stopOptionParsing) { + if (COMMAND_FLAGS_WITH_VALUE.has(normalizedArg)) { + expectFlagValue = true; + continue; + } + if (normalizedArg.startsWith("--config=")) { + continue; + } + if (normalizedArg.startsWith("-")) { + continue; + } + } + return { + command: normalizedArg, + index, + }; + } + + return null; +} + +export function hasTopLevelHelpOrVersionFlag(args) { + let expectFlagValue = false; + + for (let index = 0; index < args.length; index += 1) { + const normalizedArg = `${args[index] ?? ""}`.trim().toLowerCase(); + if (normalizedArg.length === 0) { + continue; + } + if (expectFlagValue) { + expectFlagValue = false; + continue; + } + if (normalizedArg === "--") { + return false; + } + if (HELP_OR_VERSION_FLAGS.has(normalizedArg)) { + return true; + } + if (COMMAND_FLAGS_WITH_VALUE.has(normalizedArg)) { + expectFlagValue = true; + continue; + } + if (normalizedArg.startsWith("--config=")) { + continue; + } + if (normalizedArg.startsWith("-")) { + continue; + } + return false; + } + + return false; +} + +export function splitCodexCommandArgs(args) { + const primaryCommand = findPrimaryCodexCommand(args); + if (!primaryCommand) { + return { + leadingArgs: [...args], + command: null, + trailingArgs: [], + }; + } + + return { + leadingArgs: args.slice(0, primaryCommand.index), + command: primaryCommand.command, + trailingArgs: args.slice(primaryCommand.index + 1), + }; +} + export { AUTH_SUBCOMMANDS }; diff --git a/scripts/codex-supervisor.js b/scripts/codex-supervisor.js new file mode 100644 index 00000000..3b254c03 --- /dev/null +++ b/scripts/codex-supervisor.js @@ -0,0 +1,2251 @@ +import { spawn } from "node:child_process"; +import { createReadStream, promises as fs } from "node:fs"; +import { homedir } from "node:os"; +import { dirname, join, resolve as resolvePath } from "node:path"; +import process from "node:process"; +import { createInterface } from "node:readline"; +import { + findPrimaryCodexCommand, + hasTopLevelHelpOrVersionFlag, + splitCodexCommandArgs, +} from "./codex-routing.js"; + +const DEFAULT_POLL_MS = 300; +const DEFAULT_IDLE_MS = 250; +const DEFAULT_SESSION_CAPTURE_TIMEOUT_MS = 1_500; +const DEFAULT_SIGNAL_TIMEOUT_MS = process.platform === "win32" ? 75 : 350; +const DEFAULT_QUOTA_PROBE_TIMEOUT_MS = 4_000; +const DEFAULT_MONITOR_PROBE_TIMEOUT_MS = 1_250; +const DEFAULT_SELECTION_PROBE_TIMEOUT_MS = 2_500; +const DEFAULT_SELECTION_PROBE_BATCH_SIZE = 4; +const DEFAULT_SNAPSHOT_CACHE_TTL_MS = 1_500; +const DEFAULT_PREWARM_MARGIN_PERCENT_5H = 5; +const DEFAULT_PREWARM_MARGIN_PERCENT_7D = 3; +const DEFAULT_SESSION_BINDING_POLL_MS = 50; +const DEFAULT_STORAGE_LOCK_WAIT_MS = 10_000; +const DEFAULT_STORAGE_LOCK_POLL_MS = 100; +const DEFAULT_STORAGE_LOCK_TTL_MS = 30_000; +const DEFAULT_UNLINK_RETRY_ATTEMPTS = 4; +const DEFAULT_UNLINK_RETRY_BASE_DELAY_MS = 25; +const INTERNAL_RECOVERABLE_COOLDOWN_MS = 60_000; +const SESSION_ID_PATTERN = /^[A-Za-z0-9_][A-Za-z0-9_-]{0,127}$/; +const SESSION_META_SCAN_LINE_LIMIT = 200; +const MAX_ACCOUNT_SELECTION_ATTEMPTS = parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_MAX_ACCOUNT_SELECTION_ATTEMPTS", + 32, + 1, +); +const MAX_SESSION_RESTARTS = parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_MAX_RESTARTS", + 16, + 1, +); +const CODEX_FAMILY = "codex"; +const snapshotProbeCache = new Map(); +const sessionRolloutPathById = new Map(); + +function sleep(ms, signal) { + return new Promise((resolve) => { + if (signal?.aborted) { + resolve(false); + return; + } + + let settled = false; + const finish = (completed) => { + if (settled) return; + settled = true; + clearTimeout(timer); + signal?.removeEventListener("abort", onAbort); + resolve(completed); + }; + const onAbort = () => finish(false); + const timer = setTimeout(() => finish(true), ms); + if (typeof timer.unref === "function") { + timer.unref(); + } + + signal?.addEventListener("abort", onAbort, { once: true }); + }); +} + +function createAbortError(message = "Operation aborted") { + const error = new Error(message); + error.name = "AbortError"; + return error; +} + +function createProbeUnavailableError(error) { + const wrapped = new Error( + error instanceof Error ? error.message : String(error), + { cause: error }, + ); + wrapped.name = "QuotaProbeUnavailableError"; + return wrapped; +} + +function throwIfAborted(signal, message = "Operation aborted") { + if (signal?.aborted) { + throw createAbortError(message); + } +} + +function abortablePromise(promise, signal, message = "Operation aborted") { + if (!signal) return promise; + if (signal.aborted) { + return Promise.reject(createAbortError(message)); + } + + let onAbort; + const cleanup = () => { + if (onAbort) { + signal.removeEventListener("abort", onAbort); + } + }; + return Promise.race([ + Promise.resolve(promise).finally(cleanup), + new Promise((_, reject) => { + onAbort = () => { + cleanup(); + reject(createAbortError(message)); + }; + signal.addEventListener("abort", onAbort, { once: true }); + }), + ]); +} + +function parseBooleanEnv(name, fallback) { + const raw = (process.env[name] ?? "").trim().toLowerCase(); + if (raw === "1" || raw === "true") return true; + if (raw === "0" || raw === "false") return false; + return fallback; +} + +function parseNumberEnv(name, fallback, min = 0) { + const raw = Number(process.env[name]); + if (!Number.isFinite(raw)) return fallback; + return Math.max(min, Math.trunc(raw)); +} + +function resolveProbeTimeoutMs(name, fallback) { + const globalFallback = parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_PROBE_TIMEOUT_MS", + fallback, + 1_000, + ); + return parseNumberEnv(name, globalFallback, 1_000); +} + +function resolveCodexHomeDir() { + const fromEnv = (process.env.CODEX_HOME ?? "").trim(); + if (fromEnv.length > 0) return fromEnv; + return join(homedir(), ".codex"); +} + +function getSessionsRootDir() { + const override = (process.env.CODEX_MULTI_AUTH_CLI_SESSIONS_DIR ?? "").trim(); + if (override.length > 0) return override; + return join(resolveCodexHomeDir(), "sessions"); +} + +export function isInteractiveCommand(rawArgs) { + const command = findPrimaryCodexCommand(rawArgs)?.command; + return !command || command === "resume" || command === "fork"; +} + +function isNonInteractiveCommand(rawArgs) { + return !isInteractiveCommand(rawArgs); +} + +function isSupervisorAccountGateBypassCommand(rawArgs) { + if (hasTopLevelHelpOrVersionFlag(rawArgs)) { + return true; + } + + const primaryCommand = findPrimaryCodexCommand(rawArgs)?.command; + return ( + primaryCommand === "auth" || + primaryCommand === "help" || + primaryCommand === "version" + ); +} + +function readResumeSessionId(rawArgs) { + const primaryCommand = findPrimaryCodexCommand(rawArgs); + if (primaryCommand?.command !== "resume") return null; + const sessionId = `${rawArgs[primaryCommand.index + 1] ?? ""}`.trim(); + return isValidSessionId(sessionId) ? sessionId : null; +} + +function rememberSessionBinding(binding) { + if (!binding?.sessionId || !binding?.rolloutPath) return; + sessionRolloutPathById.set(binding.sessionId, binding.rolloutPath); +} + +function clearSessionBindingPathCache() { + sessionRolloutPathById.clear(); +} + +function createLinkedAbortController(parentSignal) { + const controller = new AbortController(); + if (!parentSignal) { + return { + controller, + cleanup: () => controller.abort(), + }; + } + const onParentAbort = () => controller.abort(); + parentSignal.addEventListener("abort", onParentAbort, { once: true }); + return { + controller, + cleanup: () => { + parentSignal.removeEventListener("abort", onParentAbort); + controller.abort(); + }, + }; +} + +function getSessionBindingEntryPasses(entries, sinceMs, sessionId, hasKnownRolloutPath) { + const sortedEntries = [...entries].sort((left, right) => right.mtimeMs - left.mtimeMs); + const recentEntries = sortedEntries.filter((entry) => entry.mtimeMs >= sinceMs - 2_000); + if (!sessionId || hasKnownRolloutPath) { + return [recentEntries]; + } + if (recentEntries.length === 0 || recentEntries.length === sortedEntries.length) { + return [sortedEntries]; + } + return [recentEntries, sortedEntries]; +} + +async function importIfPresent(specifier) { + try { + return await import(specifier); + } catch (error) { + if ( + error && + typeof error === "object" && + "code" in error && + error.code === "ERR_MODULE_NOT_FOUND" + ) { + return null; + } + throw error; + } +} + +async function loadSupervisorRuntime() { + const [configModule, accountsModule, quotaModule, storageModule] = + await Promise.all([ + importIfPresent("../dist/lib/config.js"), + importIfPresent("../dist/lib/accounts.js"), + importIfPresent("../dist/lib/quota-probe.js"), + importIfPresent("../dist/lib/storage.js"), + ]); + + if ( + !configModule || + !accountsModule?.AccountManager || + !quotaModule?.fetchCodexQuotaSnapshot + ) { + return null; + } + + const configAccessors = createRuntimeConfigAccessors(configModule); + + return { + loadPluginConfig: configModule.loadPluginConfig, + ...configAccessors, + AccountManager: accountsModule.AccountManager, + fetchCodexQuotaSnapshot: quotaModule.fetchCodexQuotaSnapshot, + getStoragePath: storageModule?.getStoragePath, + }; +} + +function createRuntimeConfigAccessors(configModule) { + return { + getCodexCliSessionSupervisor: + configModule?.getCodexCliSessionSupervisor ?? + ((pluginConfig) => + parseBooleanEnv( + "CODEX_AUTH_CLI_SESSION_SUPERVISOR", + pluginConfig?.codexCliSessionSupervisor ?? false, + )), + getRetryAllAccountsRateLimited: + configModule?.getRetryAllAccountsRateLimited ?? + ((pluginConfig) => + parseBooleanEnv( + "CODEX_AUTH_RETRY_ALL_RATE_LIMITED", + pluginConfig?.retryAllAccountsRateLimited !== false, + )), + getPreemptiveQuotaEnabled: + configModule?.getPreemptiveQuotaEnabled ?? + ((pluginConfig) => + parseBooleanEnv( + "CODEX_AUTH_PREEMPTIVE_QUOTA_ENABLED", + pluginConfig?.preemptiveQuotaEnabled !== false, + )), + getPreemptiveQuotaRemainingPercent5h: + configModule?.getPreemptiveQuotaRemainingPercent5h ?? + ((pluginConfig) => + parseNumberEnv( + "CODEX_AUTH_PREEMPTIVE_QUOTA_5H_REMAINING_PCT", + pluginConfig?.preemptiveQuotaRemainingPercent5h ?? 5, + 0, + 100, + )), + getPreemptiveQuotaRemainingPercent7d: + configModule?.getPreemptiveQuotaRemainingPercent7d ?? + ((pluginConfig) => + parseNumberEnv( + "CODEX_AUTH_PREEMPTIVE_QUOTA_7D_REMAINING_PCT", + pluginConfig?.preemptiveQuotaRemainingPercent7d ?? 5, + 0, + 100, + )), + }; +} + +function relaunchNotice(message) { + process.stderr.write(`codex-multi-auth: ${message}\n`); +} + +function supervisorDebug(message) { + if ( + parseBooleanEnv("CODEX_AUTH_CLI_SESSION_DEBUG", false) + ) { + relaunchNotice(message); + } +} + +function formatQuotaPressure(pressure) { + const parts = []; + if (typeof pressure.remaining5h === "number") { + parts.push(`5h=${pressure.remaining5h}%`); + } + if (typeof pressure.remaining7d === "number") { + parts.push(`7d=${pressure.remaining7d}%`); + } + return parts.length > 0 ? parts.join(" ") : "quota=unknown"; +} + +function logRotationSummary(sessionId, trace, nextReady) { + if (!parseBooleanEnv("CODEX_AUTH_CLI_SESSION_DEBUG", false)) { + return; + } + + const parts = []; + if (trace.detectedAtMs && trace.restartRequestedAtMs) { + parts.push(`detect_to_restart=${trace.restartRequestedAtMs - trace.detectedAtMs}ms`); + } + if (trace.prewarmStartedAtMs && trace.prewarmCompletedAtMs) { + parts.push( + `prewarm=${trace.prewarmCompletedAtMs - trace.prewarmStartedAtMs}ms`, + ); + } + if (trace.restartRequestedAtMs && trace.resumeReadyAtMs) { + parts.push(`restart_to_ready=${trace.resumeReadyAtMs - trace.restartRequestedAtMs}ms`); + } + + const accountLabel = + nextReady?.account?.email ?? + nextReady?.account?.accountId ?? + `index ${nextReady?.account?.index ?? "unknown"}`; + supervisorDebug( + `rotation summary session=${sessionId} account=${accountLabel} ${parts.join(" ")}`.trim(), + ); +} + +function normalizeExitCode(code, signal) { + if (signal) { + return signal === "SIGINT" ? 130 : 1; + } + return typeof code === "number" ? code : 1; +} + +function buildResumeArgs(sessionId, currentArgs) { + const { leadingArgs, command, trailingArgs } = splitCodexCommandArgs(currentArgs); + const remainingArgs = + (command === "resume" || command === "fork") && + isValidSessionId(trailingArgs[0]) + ? trailingArgs.slice(1) + : trailingArgs; + return [...leadingArgs, "resume", sessionId, ...remainingArgs]; +} + +function getCurrentAccount(manager) { + if (typeof manager.getCurrentAccountForFamily === "function") { + return manager.getCurrentAccountForFamily(CODEX_FAMILY); + } + if (typeof manager.getCurrentAccount === "function") { + return manager.getCurrentAccount(); + } + return null; +} + +function pickNextCandidate(manager) { + if (typeof manager.getCurrentOrNextForFamilyHybrid === "function") { + return manager.getCurrentOrNextForFamilyHybrid(CODEX_FAMILY); + } + if (typeof manager.getCurrentOrNext === "function") { + return manager.getCurrentOrNext(); + } + return null; +} + +function getNearestWaitMs(manager) { + if (typeof manager.getMinWaitTimeForFamily === "function") { + return Math.max(0, manager.getMinWaitTimeForFamily(CODEX_FAMILY)); + } + if (typeof manager.getMinWaitTime === "function") { + return Math.max(0, manager.getMinWaitTime()); + } + return 0; +} + +async function persistActiveSelection(manager, account, signal) { + if (typeof manager.setActiveIndex === "function") { + manager.setActiveIndex(account.index); + } + if (typeof manager.syncCodexCliActiveSelectionForIndex === "function") { + await manager.syncCodexCliActiveSelectionForIndex(account.index); + } + if (typeof manager.saveToDisk === "function") { + throwIfAborted(signal, "Supervisor storage lock lease lost"); + await manager.saveToDisk(); + } +} + +async function safeUnlink(path) { + for (let attempt = 0; attempt < DEFAULT_UNLINK_RETRY_ATTEMPTS; attempt += 1) { + try { + await fs.unlink(path); + return true; + } catch (error) { + const code = + error && typeof error === "object" && "code" in error + ? `${error.code ?? ""}` + : ""; + if (code === "ENOENT") { + return true; + } + const canRetry = + (code === "EPERM" || code === "EBUSY") && + attempt + 1 < DEFAULT_UNLINK_RETRY_ATTEMPTS; + if (!canRetry) { + return false; + } + await new Promise((resolve) => + setTimeout(resolve, DEFAULT_UNLINK_RETRY_BASE_DELAY_MS * (attempt + 1)), + ); + } + } + return false; +} + +function getSupervisorStoragePath(runtime) { + if (typeof runtime.getStoragePath === "function") { + let storagePath; + try { + storagePath = runtime.getStoragePath(); + } catch (error) { + throw new Error( + `Failed to resolve supervisor storage path via runtime.getStoragePath(): ${ + error instanceof Error ? error.message : String(error) + }`, + { cause: error }, + ); + } + if (typeof storagePath !== "string" || storagePath.trim().length === 0) { + throw new Error( + "Failed to resolve supervisor storage path via runtime.getStoragePath(): received an empty path", + ); + } + return storagePath; + } + + return join( + resolveCodexHomeDir(), + "multi-auth", + "openai-codex-accounts.json", + ); +} + +function getSupervisorStorageLockPath(runtime) { + return `${getSupervisorStoragePath(runtime)}.supervisor.lock`; +} + +function isValidSessionId(value) { + return SESSION_ID_PATTERN.test(`${value ?? ""}`.trim()); +} + +async function readSupervisorLockPayload(lockPath) { + try { + const raw = await fs.readFile(lockPath, "utf8"); + return JSON.parse(raw); + } catch { + return null; + } +} + +function createSupervisorLockPayload(ownerId, acquiredAt, ttlMs) { + return { + ownerId, + pid: process.pid, + acquiredAt, + expiresAt: Date.now() + ttlMs, + }; +} + +async function writeSupervisorLockPayload(lockPath, ownerId, acquiredAt, ttlMs) { + await fs.writeFile( + lockPath, + `${JSON.stringify( + createSupervisorLockPayload(ownerId, acquiredAt, ttlMs), + )}\n`, + "utf8", + ); +} + +async function safeUnlinkOwnedSupervisorLock(lockPath, ownerId) { + const payload = await readSupervisorLockPayload(lockPath); + if ( + payload && + typeof payload.ownerId === "string" && + payload.ownerId.length > 0 && + payload.ownerId !== ownerId + ) { + return false; + } + return safeUnlink(lockPath); +} + +function createSupervisorLockHeartbeat(lockPath, ownerId, acquiredAt, ttlMs, signal) { + const refreshMs = Math.max(25, Math.floor(ttlMs / 3)); + let stopped = false; + let failedError = null; + let settleHeartbeat; + let rejectHeartbeat; + let pendingRefresh = Promise.resolve(); + const heartbeatPromise = new Promise((resolve, reject) => { + settleHeartbeat = resolve; + rejectHeartbeat = reject; + }); + let timer = null; + const stopWithError = (error) => { + if (failedError || stopped) { + return; + } + failedError = error; + stopped = true; + if (timer) { + clearInterval(timer); + } + rejectHeartbeat(error); + }; + const refresh = () => { + if (stopped || signal?.aborted) { + return; + } + pendingRefresh = pendingRefresh.then(async () => { + const payload = await readSupervisorLockPayload(lockPath); + if (!payload) { + stopWithError( + new Error( + `Supervisor lock heartbeat lost lease at ${lockPath} for owner ${ownerId}: lock file disappeared`, + ), + ); + return; + } + if ( + typeof payload.ownerId === "string" && + payload.ownerId.length > 0 && + payload.ownerId !== ownerId + ) { + stopWithError( + new Error( + `Supervisor lock heartbeat lost lease at ${lockPath} for owner ${ownerId}: observed owner ${payload.ownerId}`, + ), + ); + return; + } + try { + await writeSupervisorLockPayload(lockPath, ownerId, acquiredAt, ttlMs); + } catch (error) { + stopWithError( + new Error( + `Failed to refresh supervisor lock lease at ${lockPath} for owner ${ownerId}: ${ + error instanceof Error ? error.message : String(error) + }`, + { cause: error }, + ), + ); + } + }); + }; + timer = setInterval(refresh, refreshMs); + if (typeof timer.unref === "function") { + timer.unref(); + } + return { + promise: heartbeatPromise, + stop: async () => { + stopped = true; + clearInterval(timer); + settleHeartbeat(); + await pendingRefresh; + if (failedError) { + throw failedError; + } + }, + }; +} + +async function isSupervisorLockStale(lockPath, ttlMs) { + const now = Date.now(); + const payload = await readSupervisorLockPayload(lockPath); + if (payload && typeof payload.expiresAt === "number" && payload.expiresAt <= now) { + return true; + } + + try { + const stat = await fs.stat(lockPath); + return now - stat.mtimeMs > ttlMs; + } catch (error) { + if ( + error && + typeof error === "object" && + "code" in error && + error.code === "ENOENT" + ) { + return true; + } + console.warn( + `codex-multi-auth: treating unreadable supervisor lock as stale at ${lockPath}: ${ + error instanceof Error ? error.message : String(error) + }`, + ); + return true; + } +} + +async function withSupervisorStorageLock(runtime, fn, signal) { + const lockPath = getSupervisorStorageLockPath(runtime); + const lockDir = dirname(lockPath); + const waitMs = parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_LOCK_WAIT_MS", + DEFAULT_STORAGE_LOCK_WAIT_MS, + 0, + ); + const pollMs = parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_LOCK_POLL_MS", + DEFAULT_STORAGE_LOCK_POLL_MS, + 25, + ); + const ttlMs = parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_LOCK_TTL_MS", + DEFAULT_STORAGE_LOCK_TTL_MS, + 1_000, + ); + + await fs.mkdir(lockDir, { recursive: true }); + + const deadline = Date.now() + waitMs; + while (true) { + if (signal?.aborted) { + throw createAbortError("Supervisor storage lock wait aborted"); + } + + try { + const handle = await fs.open(lockPath, "wx"); + const acquiredAt = Date.now(); + const ownerId = `${process.pid}:${acquiredAt}:${Math.random() + .toString(36) + .slice(2)}`; + try { + await handle.writeFile( + `${JSON.stringify( + createSupervisorLockPayload(ownerId, acquiredAt, ttlMs), + )}\n`, + "utf8", + ); + } finally { + await handle.close(); + } + const heartbeat = createSupervisorLockHeartbeat( + lockPath, + ownerId, + acquiredAt, + ttlMs, + signal, + ); + + const lease = createLinkedAbortController(signal); + let heartbeatError = null; + heartbeat.promise.catch((error) => { + heartbeatError = heartbeatError ?? error; + if (!lease.controller.signal.aborted) { + lease.controller.abort(); + } + }); + let result; + let fnError = null; + try { + result = await fn(lease.controller.signal); + } catch (error) { + fnError = error; + } finally { + lease.cleanup(); + try { + await heartbeat.stop(); + } catch (error) { + heartbeatError = heartbeatError ?? error; + } + await safeUnlinkOwnedSupervisorLock(lockPath, ownerId); + } + if (heartbeatError) { + if (fnError && fnError?.name !== "AbortError") { + throw fnError; + } + throw heartbeatError; + } + if (fnError) { + throw fnError; + } + return result; + } catch (error) { + const code = + error && typeof error === "object" && "code" in error + ? `${error.code ?? ""}` + : ""; + if (code !== "EEXIST" && code !== "EPERM" && code !== "EBUSY") { + throw error; + } + + if (await isSupervisorLockStale(lockPath, ttlMs)) { + const stalePayload = await readSupervisorLockPayload(lockPath); + const removed = + typeof stalePayload?.ownerId === "string" && + stalePayload.ownerId.length > 0 + ? await safeUnlinkOwnedSupervisorLock( + lockPath, + stalePayload.ownerId, + ) + : await safeUnlink(lockPath); + if (removed) continue; + } + + if (Date.now() >= deadline) { + throw new Error( + `Timed out waiting for supervisor storage lock at ${lockPath}`, + ); + } + + const slept = await sleep(pollMs, signal); + if (!slept) { + throw createAbortError("Supervisor storage lock wait aborted"); + } + } + } +} + +async function withLockedManager(runtime, mutate, signal) { + return withSupervisorStorageLock(runtime, async (lockSignal) => { + throwIfAborted(lockSignal, "Supervisor storage lock lease lost"); + const manager = await runtime.AccountManager.loadFromDisk(); + throwIfAborted(lockSignal, "Supervisor storage lock lease lost"); + return mutate(manager, lockSignal); + }, signal); +} + +function getManagerAccounts(manager, extraAccounts = []) { + const accounts = + typeof manager.getAccountsSnapshot === "function" + ? manager.getAccountsSnapshot() + : []; + const seen = new Set(); + const deduped = []; + for (const item of [...accounts, ...extraAccounts]) { + if (!item) continue; + const key = [ + `${item.index ?? ""}`, + `${item.accountId ?? ""}`, + `${item.email ?? ""}`, + ].join("|"); + if (seen.has(key)) continue; + seen.add(key); + deduped.push(item); + } + return deduped; +} + +function getAccountIdentityKey(account) { + if (!account) return ""; + return [ + `${account.index ?? ""}`, + `${account.accountId ?? ""}`, + `${account.email ?? ""}`, + ].join("|"); +} + +function isEligibleProbeAccount(account, now = Date.now()) { + return Boolean( + account && + account.enabled !== false && + (account.coolingDownUntil ?? 0) <= now, + ); +} + +function getProbeCandidateBatch(manager, limit, excludedAccounts = []) { + const accounts = getManagerAccounts(manager); + const leadingCandidate = pickNextCandidate(manager); + const ordered = leadingCandidate ? [leadingCandidate, ...accounts] : accounts; + const now = Date.now(); + const excludedKeys = new Set( + excludedAccounts.map((account) => getAccountIdentityKey(account)).filter(Boolean), + ); + const seen = new Set(); + const batch = []; + + for (const item of ordered) { + const account = resolveMatchingAccount(accounts, item) ?? item; + if (!isEligibleProbeAccount(account, now)) { + continue; + } + const key = getAccountIdentityKey(account); + if (excludedKeys.has(key)) { + continue; + } + if (seen.has(key)) { + continue; + } + seen.add(key); + batch.push(account); + if (batch.length >= limit) { + break; + } + } + + return batch; +} + +function resolveUniqueFieldMatch(accounts, field, value) { + if (!value) return null; + const matches = accounts.filter((item) => item && item[field] === value); + return matches.length === 1 ? matches[0] : null; +} + +function resolveMatchingAccount(accounts, account) { + if (!account) return null; + if (account.refreshToken) { + const byRefreshToken = + accounts.find((item) => item?.refreshToken === account.refreshToken) ?? + null; + if (byRefreshToken) return byRefreshToken; + } + const byAccountId = resolveUniqueFieldMatch( + accounts, + "accountId", + account.accountId, + ); + if (byAccountId) return byAccountId; + const byEmail = resolveUniqueFieldMatch(accounts, "email", account.email); + if (byEmail) return byEmail; + return accounts.find((item) => item?.index === account.index) ?? null; +} + +function resolveAccountInManager(manager, account, knownAccounts = null) { + if (!account) return null; + + const direct = + typeof manager.getAccountByIndex === "function" + ? manager.getAccountByIndex(account.index) + : null; + const current = getCurrentAccount(manager); + const candidate = pickNextCandidate(manager); + const accounts = + knownAccounts ?? getManagerAccounts(manager, [direct, current, candidate]); + + if (direct && resolveMatchingAccount(accounts, account) === direct) return direct; + if (current && resolveMatchingAccount(accounts, account) === current) return current; + if (candidate && resolveMatchingAccount(accounts, account) === candidate) { + return candidate; + } + + return resolveMatchingAccount(accounts, account); +} + +function accountsReferToSameStoredAccount( + manager, + left, + right, + knownAccounts = null, +) { + const accounts = knownAccounts ?? getManagerAccounts(manager, [left, right]); + const resolvedLeft = resolveAccountInManager(manager, left, accounts); + const resolvedRight = resolveAccountInManager(manager, right, accounts); + return Boolean( + resolvedLeft && + resolvedRight && + resolvedLeft.index === resolvedRight.index && + `${resolvedLeft.refreshToken ?? ""}` === `${resolvedRight.refreshToken ?? ""}`, + ); +} + +function computeWaitMsFromSnapshot(snapshot) { + const now = Date.now(); + const candidates = [snapshot?.primary?.resetAtMs, snapshot?.secondary?.resetAtMs] + .filter((value) => typeof value === "number" && Number.isFinite(value)) + .map((value) => Math.max(0, value - now)) + .filter((value) => value > 0); + return candidates.length > 0 ? Math.min(...candidates) : 0; +} + +function computeQuotaPressure(snapshot, runtime, pluginConfig) { + if (!snapshot) { + return { + prewarm: false, + rotate: false, + reason: "none", + waitMs: 0, + remaining5h: undefined, + remaining7d: undefined, + }; + } + + if (snapshot.status === 429) { + return { + prewarm: true, + rotate: true, + reason: "rate-limit", + waitMs: computeWaitMsFromSnapshot(snapshot), + remaining5h: undefined, + remaining7d: undefined, + }; + } + + if (!runtime.getPreemptiveQuotaEnabled(pluginConfig)) { + return { + prewarm: false, + rotate: false, + reason: "none", + waitMs: 0, + remaining5h: undefined, + remaining7d: undefined, + }; + } + + const remaining5h = + typeof snapshot.primary?.usedPercent === "number" + ? Math.max(0, Math.round(100 - snapshot.primary.usedPercent)) + : undefined; + const remaining7d = + typeof snapshot.secondary?.usedPercent === "number" + ? Math.max(0, Math.round(100 - snapshot.secondary.usedPercent)) + : undefined; + const threshold5h = runtime.getPreemptiveQuotaRemainingPercent5h(pluginConfig); + const threshold7d = runtime.getPreemptiveQuotaRemainingPercent7d(pluginConfig); + const prewarmThreshold5h = Math.min( + 100, + threshold5h + + parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_PREWARM_MARGIN_PERCENT_5H", + DEFAULT_PREWARM_MARGIN_PERCENT_5H, + 0, + ), + ); + const prewarmThreshold7d = Math.min( + 100, + threshold7d + + parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_PREWARM_MARGIN_PERCENT_7D", + DEFAULT_PREWARM_MARGIN_PERCENT_7D, + 0, + ), + ); + const near5h = + typeof remaining5h === "number" && remaining5h <= threshold5h; + const near7d = + typeof remaining7d === "number" && remaining7d <= threshold7d; + const prewarm5h = + typeof remaining5h === "number" && remaining5h <= prewarmThreshold5h; + const prewarm7d = + typeof remaining7d === "number" && remaining7d <= prewarmThreshold7d; + + if (!near5h && !near7d) { + return { + prewarm: prewarm5h || prewarm7d, + rotate: false, + reason: "none", + waitMs: 0, + remaining5h, + remaining7d, + }; + } + + return { + prewarm: true, + rotate: true, + reason: "quota-near-exhaustion", + waitMs: computeWaitMsFromSnapshot(snapshot), + remaining5h, + remaining7d, + }; +} + +function evaluateQuotaSnapshot(snapshot, runtime, pluginConfig) { + const pressure = computeQuotaPressure(snapshot, runtime, pluginConfig); + return { + rotate: pressure.rotate, + reason: pressure.reason, + waitMs: pressure.waitMs, + }; +} + +function getSnapshotCacheKey(account) { + if (!account) return ""; + const segments = [ + `${account.accountId ?? ""}`, + `${account.email ?? ""}`, + `${account.index ?? ""}`, + ]; + return segments.some((segment) => segment.length > 0) ? segments.join("|") : ""; +} + +function getSnapshotCacheTtlMs() { + return parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_SNAPSHOT_CACHE_TTL_MS", + DEFAULT_SNAPSHOT_CACHE_TTL_MS, + 0, + ); +} + +function clearProbeSnapshotCache(account) { + const cacheKey = getSnapshotCacheKey(account); + if (!cacheKey) return; + snapshotProbeCache.delete(cacheKey); +} + +function clearAllProbeSnapshotCache() { + snapshotProbeCache.clear(); +} + +function readCachedProbeSnapshot(account) { + const cacheKey = getSnapshotCacheKey(account); + if (!cacheKey) return null; + const entry = snapshotProbeCache.get(cacheKey); + if (!entry?.snapshot || entry.expiresAt <= Date.now()) { + if (entry && !entry.pending) { + snapshotProbeCache.delete(cacheKey); + } + return null; + } + return entry.snapshot; +} + +function rememberProbeSnapshot(account, snapshot) { + const cacheKey = getSnapshotCacheKey(account); + if (!cacheKey) return; + const ttlMs = getSnapshotCacheTtlMs(); + if (ttlMs <= 0) { + snapshotProbeCache.delete(cacheKey); + return; + } + const current = snapshotProbeCache.get(cacheKey); + snapshotProbeCache.set(cacheKey, { + ...current, + snapshot, + expiresAt: Date.now() + ttlMs, + }); +} + +async function probeAccountSnapshot(runtime, account, signal, timeoutMs, options = {}) { + if (signal?.aborted) { + throw createAbortError("Quota probe aborted"); + } + if (!account?.accountId || !account?.access) { + return null; + } + const cacheKey = getSnapshotCacheKey(account); + let pendingResolver = null; + let pendingRejecter = null; + let pendingPromise = null; + if (options.useCache !== false) { + const cachedSnapshot = readCachedProbeSnapshot(account); + if (cachedSnapshot) { + return cachedSnapshot; + } + const pendingEntry = cacheKey ? snapshotProbeCache.get(cacheKey) : null; + if (pendingEntry?.pending) { + return abortablePromise( + pendingEntry.pending, + signal, + "Quota probe aborted", + ); + } + if (cacheKey) { + pendingPromise = new Promise((resolve, reject) => { + pendingResolver = resolve; + pendingRejecter = reject; + }); + pendingPromise.catch(() => undefined); + snapshotProbeCache.set(cacheKey, { + snapshot: pendingEntry?.snapshot ?? null, + expiresAt: pendingEntry?.expiresAt ?? 0, + pending: pendingPromise, + }); + } + } + + const fetchPromise = (async () => { + try { + const snapshot = await runtime.fetchCodexQuotaSnapshot({ + accountId: account.accountId, + accessToken: account.access, + timeoutMs: timeoutMs ?? DEFAULT_QUOTA_PROBE_TIMEOUT_MS, + signal, + }); + rememberProbeSnapshot(account, snapshot); + pendingResolver?.(snapshot); + return snapshot; + } catch (error) { + const normalizedError = + signal?.aborted || error?.name === "AbortError" + ? error + : createProbeUnavailableError(error); + pendingRejecter?.(normalizedError); + if (signal?.aborted || error?.name === "AbortError") { + throw error; + } + throw normalizedError; + } finally { + if (cacheKey) { + const current = snapshotProbeCache.get(cacheKey); + if (current?.pending === pendingPromise) { + snapshotProbeCache.set(cacheKey, { + snapshot: current.snapshot ?? null, + expiresAt: current.expiresAt ?? 0, + }); + } + } + } + })(); + + try { + return await abortablePromise(fetchPromise, signal, "Quota probe aborted"); + } catch (error) { + if (signal?.aborted || error?.name === "AbortError") { + throw error; + } + if (error?.name === "QuotaProbeUnavailableError") { + throw error; + } + return null; + } +} + +function markAccountUnavailable(manager, account, evaluation) { + clearProbeSnapshotCache(account); + const waitMs = Math.max( + evaluation.waitMs || 0, + evaluation.reason === "rate-limit" ? 1 : 0, + ); + + if (waitMs > 0 && typeof manager.markRateLimitedWithReason === "function") { + manager.markRateLimitedWithReason( + account, + waitMs, + CODEX_FAMILY, + evaluation.reason === "rate-limit" + ? "rate_limit_detected" + : "quota_near_exhaustion", + ); + return; + } + + if (typeof manager.markAccountCoolingDown === "function") { + manager.markAccountCoolingDown( + account, + INTERNAL_RECOVERABLE_COOLDOWN_MS, + evaluation.reason === "rate-limit" ? "rate-limit" : "network-error", + ); + } +} + +async function markCurrentAccountForRestart( + runtime, + currentAccount, + restartDecision, + signal, +) { + if (!currentAccount || !restartDecision) { + return null; + } + + return withLockedManager(runtime, async (freshManager, lockSignal) => { + const targetAccount = resolveAccountInManager(freshManager, currentAccount); + if (targetAccount) { + markAccountUnavailable(freshManager, targetAccount, restartDecision); + if (typeof freshManager.saveToDisk === "function") { + throwIfAborted(lockSignal, "Supervisor storage lock lease lost"); + await freshManager.saveToDisk(); + } + } + return freshManager; + }, signal); +} + +async function ensureLaunchableAccount( + runtime, + pluginConfig, + signal, + options = {}, +) { + const probeTimeoutMs = + options.probeTimeoutMs ?? DEFAULT_SELECTION_PROBE_TIMEOUT_MS; + const probeBatchSize = parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_SELECTION_PROBE_BATCH_SIZE", + DEFAULT_SELECTION_PROBE_BATCH_SIZE, + 1, + ); + let attempts = 0; + while (attempts < MAX_ACCOUNT_SELECTION_ATTEMPTS) { + attempts += 1; + if (signal?.aborted) { + return { ok: false, account: null, aborted: true }; + } + + const initial = await withLockedManager(runtime, async (manager) => { + const accounts = getProbeCandidateBatch( + manager, + probeBatchSize, + options.excludedAccounts ?? [], + ); + if (accounts.length === 0) { + return { + kind: "wait", + waitMs: getNearestWaitMs(manager), + account: null, + }; + } + return { + kind: "probe", + accounts, + }; + }, signal); + + if (initial.kind === "wait") { + if (initial.waitMs <= 0 || !runtime.getRetryAllAccountsRateLimited(pluginConfig)) { + return { ok: false, account: null }; + } + + relaunchNotice( + `all accounts unavailable, waiting ${Math.ceil(initial.waitMs / 1000)}s for the next eligible window`, + ); + const slept = await sleep(initial.waitMs, signal); + if (!slept) { + return { ok: false, account: null, aborted: true }; + } + continue; + } + + const probeResults = []; + for (const account of initial.accounts) { + probeResults.push( + (async () => { + try { + const snapshot = await probeAccountSnapshot( + runtime, + account, + signal, + probeTimeoutMs, + ); + return { + account, + snapshot, + evaluation: evaluateQuotaSnapshot(snapshot, runtime, pluginConfig), + }; + } catch (error) { + if (signal?.aborted || error?.name === "AbortError") { + throw error; + } + if (error?.name === "QuotaProbeUnavailableError") { + return { + account, + snapshot: null, + evaluation: { + rotate: true, + reason: "probe-unavailable", + waitMs: 0, + }, + }; + } + return { + account, + snapshot: null, + evaluation: { + rotate: true, + reason: "probe-error", + waitMs: 0, + }, + }; + } + })(), + ); + } + + let evaluatedResults; + try { + evaluatedResults = await Promise.all(probeResults); + } catch (error) { + if (signal?.aborted || error?.name === "AbortError") { + return { ok: false, account: null, aborted: true }; + } + throw error; + } + + const step = await withLockedManager(runtime, async (manager, lockSignal) => { + let dirty = false; + const knownAccounts = getManagerAccounts(manager, initial.accounts); + const probeUnavailableAccounts = []; + for (const result of evaluatedResults) { + const account = resolveAccountInManager( + manager, + result.account, + knownAccounts, + ); + const currentCandidate = + getProbeCandidateBatch( + manager, + 1, + [ + ...(options.excludedAccounts ?? []), + ...probeUnavailableAccounts, + ], + )[0] ?? null; + if ( + !account || + !currentCandidate || + !accountsReferToSameStoredAccount( + manager, + currentCandidate, + account, + knownAccounts, + ) + ) { + return { + kind: "retry", + waitMs: 0, + account: null, + }; + } + + if (!result.evaluation.rotate) { + if (options.persistSelection !== false) { + await persistActiveSelection( + manager, + account, + lockSignal, + ); + } + return { + kind: "ready", + waitMs: 0, + account, + manager, + snapshot: result.snapshot, + }; + } + + if (result.evaluation.reason === "probe-unavailable") { + probeUnavailableAccounts.push(account); + continue; + } + + markAccountUnavailable(manager, account, result.evaluation); + dirty = true; + } + if (dirty && typeof manager.saveToDisk === "function") { + throwIfAborted(lockSignal, "Supervisor storage lock lease lost"); + await manager.saveToDisk(); + } + return { + kind: "retry", + waitMs: 0, + account: null, + }; + }, signal); + + if (step.kind === "ready") { + return { + ok: true, + ...step, + }; + } + } + + return { ok: false, account: null }; +} + +async function commitPreparedSelection(runtime, selectedAccount, signal) { + if (!selectedAccount) { + return { ok: false, account: null }; + } + + return withLockedManager(runtime, async (manager, lockSignal) => { + const knownAccounts = getManagerAccounts(manager, [selectedAccount]); + const account = resolveAccountInManager(manager, selectedAccount, knownAccounts); + const currentCandidate = getProbeCandidateBatch(manager, 1)[0] ?? null; + if ( + !account || + !currentCandidate || + !accountsReferToSameStoredAccount( + manager, + currentCandidate, + account, + knownAccounts, + ) + ) { + return { ok: false, account: null, manager }; + } + + await persistActiveSelection(manager, account, lockSignal); + return { + ok: true, + account, + manager, + }; + }, signal); +} + +async function prepareResumeSelection({ + runtime, + pluginConfig, + currentAccount, + // Reserved for future restart-specific account-skip hints. + restartDecision, + signal, +}) { + const startedAtMs = Date.now(); + const nextReady = await ensureLaunchableAccount( + runtime, + pluginConfig, + signal, + { + probeTimeoutMs: resolveProbeTimeoutMs( + "CODEX_AUTH_CLI_SESSION_SELECTION_PROBE_TIMEOUT_MS", + DEFAULT_SELECTION_PROBE_TIMEOUT_MS, + ), + excludedAccounts: currentAccount ? [currentAccount] : [], + persistSelection: false, + }, + ); + + return { + startedAtMs, + completedAtMs: Date.now(), + nextReady, + }; +} + +function maybeStartPreparedResumeSelection({ + runtime, + pluginConfig, + currentAccount, + restartDecision, + signal, + preparedResumeSelectionStarted, + preparedResumeSelectionPromise, +}) { + if (preparedResumeSelectionStarted || !currentAccount || !restartDecision?.sessionId) { + return { + preparedResumeSelectionStarted, + preparedResumeSelectionPromise, + }; + } + + return { + preparedResumeSelectionStarted: true, + preparedResumeSelectionPromise: prepareResumeSelection({ + runtime, + pluginConfig, + currentAccount, + restartDecision, + signal, + }).catch((error) => { + if (error?.name !== "AbortError" && !signal?.aborted) { + supervisorDebug( + `pre-warm selection failed: ${ + error instanceof Error ? error.message : String(error) + }`, + ); + } + return null; + }), + }; +} + +async function listJsonlFiles(rootDir) { + const files = []; + const pending = [rootDir]; + while (pending.length > 0) { + const nextDir = pending.pop(); + if (!nextDir) continue; + let entries = []; + try { + entries = await fs.readdir(nextDir, { withFileTypes: true }); + } catch { + continue; + } + for (const entry of entries) { + const fullPath = join(nextDir, entry.name); + if (entry.isSymbolicLink()) { + continue; + } + if (entry.isDirectory()) { + pending.push(fullPath); + continue; + } + if (entry.isFile() && entry.name.endsWith(".jsonl")) { + files.push(fullPath); + } + } + } + return files; +} + +function normalizeCwd(value) { + if (typeof value !== "string") return ""; + const trimmed = value.trim(); + if (trimmed.length === 0) return ""; + const normalized = resolvePath(trimmed).replace(/[\\/]+$/, ""); + return process.platform === "win32" ? normalized.toLowerCase() : normalized; +} + +async function extractSessionMeta(filePath) { + let stream = null; + let lineReader = null; + try { + stream = createReadStream(filePath, { encoding: "utf8" }); + lineReader = createInterface({ + input: stream, + crlfDelay: Infinity, + }); + + let scannedLineCount = 0; + for await (const rawLine of lineReader) { + const line = rawLine.trim(); + if (!line) continue; + scannedLineCount += 1; + if (scannedLineCount > SESSION_META_SCAN_LINE_LIMIT) break; + + try { + const parsed = JSON.parse(line); + const payload = + parsed?.session_meta?.payload ?? + (parsed?.type === "session_meta" ? parsed.payload : null); + const sessionId = `${payload?.id ?? ""}`.trim(); + const cwd = `${payload?.cwd ?? ""}`.trim(); + if (isValidSessionId(sessionId)) { + return { + sessionId, + cwd, + }; + } + } catch { + // Ignore malformed log lines. + } + } + } catch { + return null; + } finally { + lineReader?.close(); + stream?.destroy(); + } + + return null; +} + +async function matchSessionBindingEntry(entry, cwdKey, sessionId) { + const meta = await extractSessionMeta(entry.filePath); + if (!meta) return null; + if (sessionId && meta.sessionId === sessionId) { + return { + sessionId: meta.sessionId, + rolloutPath: entry.filePath, + lastActivityAtMs: entry.mtimeMs, + }; + } + const metaCwdKey = normalizeCwd(meta.cwd); + if (!cwdKey || !metaCwdKey || metaCwdKey !== cwdKey) return null; + return { + sessionId: meta.sessionId, + rolloutPath: entry.filePath, + lastActivityAtMs: entry.mtimeMs, + }; +} + +async function readSessionBindingEntry(filePath) { + try { + const stat = await fs.stat(filePath); + return { + filePath, + mtimeMs: stat.mtimeMs, + }; + } catch { + return null; + } +} + +async function findSessionBinding({ + cwd, + sinceMs, + sessionId, + rolloutPathHint, + sessionEntries, +}) { + const cwdKey = normalizeCwd(cwd); + const knownRolloutPath = + rolloutPathHint ?? (sessionId ? sessionRolloutPathById.get(sessionId) : null); + if (knownRolloutPath) { + const directEntry = await readSessionBindingEntry(knownRolloutPath); + if (directEntry) { + const directBinding = await matchSessionBindingEntry( + directEntry, + cwdKey, + sessionId, + ); + if (directBinding) { + rememberSessionBinding(directBinding); + return directBinding; + } + } + if (sessionId && !rolloutPathHint) { + sessionRolloutPathById.delete(sessionId); + } + } + + const files = (sessionEntries ?? + (await Promise.all( + (await listJsonlFiles(getSessionsRootDir())).map(async (filePath) => { + return readSessionBindingEntry(filePath); + }), + ))) + .filter(Boolean); + const passes = getSessionBindingEntryPasses( + files, + sinceMs, + sessionId, + Boolean(knownRolloutPath), + ); + for (const entries of passes) { + for (const entry of entries) { + const binding = await matchSessionBindingEntry(entry, cwdKey, sessionId); + if (binding) { + rememberSessionBinding(binding); + return binding; + } + } + } + + return null; +} + +async function waitForSessionBinding({ + cwd, + sinceMs, + sessionId, + rolloutPathHint, + timeoutMs, + signal, +}) { + const deadline = Date.now() + timeoutMs; + const pollMs = parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_BINDING_POLL_MS", + DEFAULT_SESSION_BINDING_POLL_MS, + 25, + ); + const listingRefreshMs = Math.max(250, pollMs * 8); + let cachedSessionEntries = null; + let lastSessionEntriesRefreshAt = 0; + while (Date.now() <= deadline) { + if ( + !cachedSessionEntries || + Date.now() - lastSessionEntriesRefreshAt >= listingRefreshMs + ) { + cachedSessionEntries = ( + await Promise.all( + (await listJsonlFiles(getSessionsRootDir())).map(async (filePath) => { + return readSessionBindingEntry(filePath); + }), + ) + ).filter(Boolean); + lastSessionEntriesRefreshAt = Date.now(); + } + + const binding = await findSessionBinding({ + cwd, + sinceMs, + sessionId, + rolloutPathHint, + sessionEntries: cachedSessionEntries, + }); + if (binding) return binding; + const slept = await sleep(pollMs, signal); + if (!slept) return null; + } + return null; +} + +async function refreshSessionActivity(binding) { + if (!binding?.rolloutPath) return binding; + try { + const stat = await fs.stat(binding.rolloutPath); + return { + ...binding, + lastActivityAtMs: stat.mtimeMs, + }; + } catch { + return binding; + } +} + +async function requestChildRestart(child, platform = process.platform, signal) { + if (child.exitCode !== null) return; + + const signalTimeoutMs = parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_SIGNAL_TIMEOUT_MS", + DEFAULT_SIGNAL_TIMEOUT_MS, + 50, + ); + const exitPromise = new Promise((resolve) => { + child.once("exit", () => resolve()); + }); + + if (platform !== "win32") { + child.kill("SIGINT"); + await Promise.race([exitPromise, sleep(signalTimeoutMs, signal)]); + if (child.exitCode !== null) return; + } + + child.kill("SIGTERM"); + await Promise.race([exitPromise, sleep(signalTimeoutMs, signal)]); + if (child.exitCode !== null) return; + + // On Windows, SIGTERM is already forceful; keep SIGKILL as the Unix fallback. + child.kill("SIGKILL"); + await Promise.race([exitPromise, sleep(signalTimeoutMs, signal)]); +} + +function spawnRealCodex(codexBin, args) { + return spawn(process.execPath, [codexBin, ...args], { + stdio: "inherit", + env: process.env, + }); +} + +async function loadCurrentSupervisorState(runtime, signal) { + return withLockedManager( + runtime, + async (freshManager) => ({ + manager: freshManager, + currentAccount: getCurrentAccount(freshManager), + }), + signal, + ); +} + +async function runInteractiveSupervision({ + codexBin, + initialArgs, + runtime, + pluginConfig, + manager, + signal, + maxSessionRestarts = MAX_SESSION_RESTARTS, + spawnChild = spawnRealCodex, + findBinding = findSessionBinding, + waitForBinding = waitForSessionBinding, + refreshBinding = refreshSessionActivity, + requestRestart = requestChildRestart, + loadCurrentState = loadCurrentSupervisorState, +}) { + let launchArgs = initialArgs; + let knownSessionId = readResumeSessionId(initialArgs); + let knownRolloutPath = null; + let launchCount = 0; + + while (launchCount < maxSessionRestarts) { + if (signal?.aborted) { + return 130; + } + launchCount += 1; + const preparedResumeSelectionLink = createLinkedAbortController(signal); + const preparedResumeSelectionController = + preparedResumeSelectionLink.controller; + const child = spawnChild(codexBin, launchArgs); + let preparedResumeSelectionPromise = null; + try { + const launchStartedAt = Date.now(); + let binding = knownSessionId + ? await findBinding({ + cwd: process.cwd(), + sinceMs: 0, + sessionId: knownSessionId, + rolloutPathHint: knownRolloutPath, + }) + : null; + if (binding?.rolloutPath) { + knownRolloutPath = binding.rolloutPath; + } + let requestedRestart = null; + let preparedResumeSelectionStarted = false; + const rotationTrace = { + detectedAtMs: 0, + prewarmStartedAtMs: 0, + prewarmCompletedAtMs: 0, + restartRequestedAtMs: 0, + resumeReadyAtMs: 0, + }; + let monitorActive = true; + const monitorController = new AbortController(); + + const pollMs = parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_SUPERVISOR_POLL_MS", + DEFAULT_POLL_MS, + 250, + ); + const idleMs = parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_SUPERVISOR_IDLE_MS", + DEFAULT_IDLE_MS, + 100, + ); + const captureTimeoutMs = parseNumberEnv( + "CODEX_AUTH_CLI_SESSION_CAPTURE_TIMEOUT_MS", + DEFAULT_SESSION_CAPTURE_TIMEOUT_MS, + 1_000, + ); + const monitorProbeTimeoutMs = resolveProbeTimeoutMs( + "CODEX_AUTH_CLI_SESSION_MONITOR_PROBE_TIMEOUT_MS", + DEFAULT_MONITOR_PROBE_TIMEOUT_MS, + ); + + let monitorFailure = null; + const monitorPromise = (async () => { + try { + while (monitorActive) { + if (!binding) { + binding = await waitForBinding({ + cwd: process.cwd(), + sinceMs: launchStartedAt, + sessionId: knownSessionId, + rolloutPathHint: knownRolloutPath, + timeoutMs: captureTimeoutMs, + signal: monitorController.signal, + }); + if (binding?.sessionId) { + knownSessionId = binding.sessionId; + knownRolloutPath = binding.rolloutPath ?? knownRolloutPath; + } + } else { + binding = await refreshBinding(binding); + if (binding?.rolloutPath) { + knownRolloutPath = binding.rolloutPath; + } + } + + if (!requestedRestart) { + let currentState; + try { + currentState = await loadCurrentState( + runtime, + monitorController.signal, + ); + } catch (error) { + if ( + monitorController.signal.aborted || + error?.name === "AbortError" + ) { + break; + } + throw error; + } + manager = currentState.manager ?? manager; + const currentAccount = currentState.currentAccount; + if (currentAccount) { + let snapshot; + try { + snapshot = await probeAccountSnapshot( + runtime, + currentAccount, + monitorController.signal, + monitorProbeTimeoutMs, + ); + } catch (error) { + if ( + monitorController.signal.aborted || + error?.name === "AbortError" + ) { + break; + } + if (error?.name === "QuotaProbeUnavailableError") { + const slept = await sleep( + pollMs, + monitorController.signal, + ); + if (!slept) { + break; + } + continue; + } + throw error; + } + const pressure = computeQuotaPressure( + snapshot, + runtime, + pluginConfig, + ); + if (pressure.prewarm && binding?.sessionId) { + if (!rotationTrace.detectedAtMs) { + rotationTrace.detectedAtMs = Date.now(); + } + if (!preparedResumeSelectionStarted) { + rotationTrace.prewarmStartedAtMs = Date.now(); + supervisorDebug( + `prewarming successor for session ${binding.sessionId} ${formatQuotaPressure(pressure)}`, + ); + const preparedState = maybeStartPreparedResumeSelection({ + runtime, + pluginConfig, + currentAccount, + restartDecision: { + sessionId: binding.sessionId, + }, + signal: preparedResumeSelectionController.signal, + preparedResumeSelectionStarted, + preparedResumeSelectionPromise, + }); + preparedResumeSelectionStarted = + preparedState.preparedResumeSelectionStarted; + preparedResumeSelectionPromise = + preparedState.preparedResumeSelectionPromise?.then((prepared) => { + if (rotationTrace.prewarmCompletedAtMs === 0) { + rotationTrace.prewarmCompletedAtMs = Date.now(); + } + return prepared; + }) ?? null; + } + } + if (pressure.rotate && binding?.sessionId) { + const pendingRestartDecision = { + reason: pressure.reason, + waitMs: pressure.waitMs, + sessionId: binding.sessionId, + }; + const lastActivityAtMs = + binding.lastActivityAtMs ?? launchStartedAt; + if (Date.now() - lastActivityAtMs >= idleMs) { + requestedRestart = pendingRestartDecision; + rotationTrace.restartRequestedAtMs = Date.now(); + relaunchNotice( + `rotating session ${binding.sessionId} because ${pressure.reason.replace(/-/g, " ")} (${formatQuotaPressure(pressure)})`, + ); + monitorActive = false; + await requestRestart(child, process.platform, signal); + monitorController.abort(); + continue; + } + } + } + } + + const slept = await sleep(pollMs, monitorController.signal); + if (!slept) break; + } + } catch (error) { + if (!monitorController.signal.aborted && error?.name !== "AbortError") { + monitorFailure = error; + } + } + })(); + + const result = await abortablePromise( + new Promise((resolve) => { + child.once("error", (error) => { + resolve({ + exitCode: 1, + error, + }); + }); + child.once("exit", (code, exitSignal) => { + resolve({ + exitCode: normalizeExitCode(code, exitSignal), + signal: exitSignal, + }); + }); + }), + signal, + "Supervisor child wait aborted", + ).catch(async (error) => { + if (error?.name !== "AbortError" || !signal?.aborted) { + throw error; + } + monitorActive = false; + monitorController.abort(); + await requestRestart(child, process.platform); + return { + exitCode: 130, + signal: "SIGTERM", + }; + }); + + monitorActive = false; + monitorController.abort(); + await monitorPromise; + if (monitorFailure && !signal?.aborted) { + relaunchNotice( + `monitor loop failed: ${monitorFailure instanceof Error ? monitorFailure.message : String(monitorFailure)}`, + ); + return result.exitCode === 0 ? 1 : result.exitCode; + } + binding = + binding ?? + (await findBinding({ + cwd: process.cwd(), + sinceMs: launchStartedAt, + sessionId: knownSessionId, + rolloutPathHint: knownRolloutPath, + })); + if (binding?.sessionId) { + knownSessionId = binding.sessionId; + knownRolloutPath = binding.rolloutPath ?? knownRolloutPath; + } + + let restartDecision = requestedRestart; + if (!restartDecision && result.exitCode !== 0 && knownSessionId) { + const refreshedState = await withLockedManager( + runtime, + async (freshManager) => ({ + manager: freshManager, + currentAccount: getCurrentAccount(freshManager), + }), + signal, + ); + manager = refreshedState.manager ?? manager; + if (signal?.aborted) { + return result.exitCode; + } + let snapshot = null; + if (refreshedState.currentAccount) { + try { + snapshot = await probeAccountSnapshot( + runtime, + refreshedState.currentAccount, + signal, + ); + } catch (error) { + if (signal?.aborted || error?.name === "AbortError") { + throw error; + } + if (error?.name !== "QuotaProbeUnavailableError") { + throw error; + } + } + } + const evaluation = evaluateQuotaSnapshot(snapshot, runtime, pluginConfig); + if (evaluation.rotate) { + restartDecision = { + reason: evaluation.reason, + waitMs: evaluation.waitMs, + sessionId: knownSessionId, + }; + } + } + + if (!restartDecision) { + return result.exitCode; + } + + if (!restartDecision.sessionId) { + relaunchNotice( + "rotation needed but no resumable session was captured; re-run `codex` manually", + ); + return result.exitCode; + } + + const currentAccount = getCurrentAccount(manager); + if (currentAccount) { + const refreshedManager = await markCurrentAccountForRestart( + runtime, + currentAccount, + restartDecision, + signal, + ); + manager = refreshedManager ?? manager; + } + + let nextReady = null; + if (preparedResumeSelectionPromise) { + const prepared = await preparedResumeSelectionPromise; + nextReady = prepared?.nextReady ?? null; + } + if (nextReady?.ok) { + const committedReady = await commitPreparedSelection( + runtime, + nextReady.account, + signal, + ); + if (committedReady?.ok) { + nextReady = committedReady; + } else { + nextReady = null; + } + } + + if (!nextReady) { + nextReady = await ensureLaunchableAccount(runtime, pluginConfig, signal, { + probeTimeoutMs: resolveProbeTimeoutMs( + "CODEX_AUTH_CLI_SESSION_SELECTION_PROBE_TIMEOUT_MS", + DEFAULT_SELECTION_PROBE_TIMEOUT_MS, + ), + }); + } + if (nextReady.aborted) { + return 130; + } + if (!nextReady.ok) { + relaunchNotice( + `no healthy account available to resume ${restartDecision.sessionId}; recover manually with \`codex resume ${restartDecision.sessionId}\` when quota resets`, + ); + return result.exitCode; + } + + manager = nextReady.manager ?? manager; + rotationTrace.resumeReadyAtMs = Date.now(); + logRotationSummary(restartDecision.sessionId, rotationTrace, nextReady); + launchArgs = buildResumeArgs(restartDecision.sessionId, launchArgs); + knownSessionId = restartDecision.sessionId; + knownRolloutPath = binding?.rolloutPath ?? knownRolloutPath; + } finally { + preparedResumeSelectionLink.cleanup(); + if (preparedResumeSelectionPromise) { + await preparedResumeSelectionPromise.catch(() => null); + } + } + } + + relaunchNotice("session supervisor reached the restart safety limit"); + return 1; +} + +async function runCodexSupervisorWithRuntime({ + codexBin, + rawArgs, + buildForwardArgs, + forwardToRealCodex, + runtime, + signal, +}) { + const pluginConfig = runtime.loadPluginConfig(); + if (!runtime.getCodexCliSessionSupervisor(pluginConfig)) { + return null; + } + + const initialArgs = buildForwardArgs(rawArgs); + if (isSupervisorAccountGateBypassCommand(rawArgs)) { + return forwardToRealCodex(codexBin, initialArgs); + } + + const ready = await ensureLaunchableAccount(runtime, pluginConfig, signal); + if (ready.aborted) { + return 130; + } + if (!ready.ok) { + relaunchNotice("no launchable account is currently available"); + return 1; + } + + if (isNonInteractiveCommand(rawArgs)) { + return forwardToRealCodex(codexBin, initialArgs); + } + + return runInteractiveSupervision({ + codexBin, + initialArgs, + runtime, + pluginConfig, + manager: ready.manager, + signal, + }); +} + +export async function runCodexSupervisorIfEnabled({ + codexBin, + rawArgs, + buildForwardArgs, + forwardToRealCodex, +}) { + const controller = new AbortController(); + const abort = () => controller.abort(); + process.once("SIGINT", abort); + process.once("SIGTERM", abort); + + try { + const runtime = await loadSupervisorRuntime(); + if (!runtime) { + return null; + } + return await runCodexSupervisorWithRuntime({ + codexBin, + rawArgs, + buildForwardArgs, + forwardToRealCodex, + runtime, + signal: controller.signal, + }); + } catch (error) { + if (error?.name === "AbortError") { + return 130; + } + throw error; + } finally { + process.off("SIGINT", abort); + process.off("SIGTERM", abort); + } +} + +const TEST_ONLY_API = { + commitPreparedSelection, + clearAllProbeSnapshotCache, + computeQuotaPressure, + clearProbeSnapshotCache, + evaluateQuotaSnapshot, + ensureLaunchableAccount, + findSessionBinding, + extractSessionMeta, + isInteractiveCommand, + isValidSessionId, + createLinkedAbortController, + getSessionBindingEntryPasses, + listJsonlFiles, + maybeStartPreparedResumeSelection, + prepareResumeSelection, + probeAccountSnapshot, + readResumeSessionId, + markCurrentAccountForRestart, + requestChildRestart, + resolveCodexHomeDir, + createRuntimeConfigAccessors, + getSessionsRootDir, + getSnapshotCacheKey, + sleep, + safeUnlinkOwnedSupervisorLock, + withLockedManager, + getSupervisorStorageLockPath, + runInteractiveSupervision, + runCodexSupervisorWithRuntime, + waitForSessionBinding, + clearSessionBindingPathCache, +}; + +export const __testOnly = + process.env.NODE_ENV === "test" ? TEST_ONLY_API : undefined; diff --git a/scripts/codex.js b/scripts/codex.js index 14487b38..1197c616 100755 --- a/scripts/codex.js +++ b/scripts/codex.js @@ -6,7 +6,14 @@ import { createRequire } from "node:module"; import { basename, delimiter, dirname, join, resolve as resolvePath } from "node:path"; import process from "node:process"; import { fileURLToPath } from "node:url"; -import { normalizeAuthAlias, shouldHandleMultiAuthAuth } from "./codex-routing.js"; +import { + normalizeAuthAlias, + shouldHandleMultiAuthAuth, +} from "./codex-routing.js"; +import { + isInteractiveCommand as isSupervisorInteractiveCommand, + runCodexSupervisorIfEnabled, +} from "./codex-supervisor.js"; function hydrateCliVersionEnv() { try { @@ -524,9 +531,30 @@ async function main() { return 1; } - await autoSyncManagerActiveSelectionIfEnabled(); const forwardArgs = buildForwardArgs(rawArgs); - return forwardToRealCodex(realCodexBin, forwardArgs); + let supervisorDidForward = false; + const forwardToRealCodexWithStartupSync = async (codexBin, args) => { + supervisorDidForward = true; + await autoSyncManagerActiveSelectionIfEnabled(); + return forwardToRealCodex(codexBin, args); + }; + const supervisedExitCode = await runCodexSupervisorIfEnabled({ + codexBin: realCodexBin, + rawArgs, + buildForwardArgs, + forwardToRealCodex: forwardToRealCodexWithStartupSync, + }); + if (supervisedExitCode !== null) { + if (supervisedExitCode === 130) { + return 130; + } + if (isSupervisorInteractiveCommand(rawArgs) && !supervisorDidForward) { + await autoSyncManagerActiveSelectionIfEnabled(); + } + return supervisedExitCode; + } + + return forwardToRealCodexWithStartupSync(realCodexBin, forwardArgs); } const exitCode = await main(); diff --git a/test/codex-bin-wrapper.test.ts b/test/codex-bin-wrapper.test.ts index cb173334..75057040 100644 --- a/test/codex-bin-wrapper.test.ts +++ b/test/codex-bin-wrapper.test.ts @@ -1,6 +1,7 @@ import { type SpawnSyncReturns, spawn, spawnSync } from "node:child_process"; import { copyFileSync, + existsSync, mkdirSync, mkdtempSync, readFileSync, @@ -57,9 +58,93 @@ function createWrapperFixture(): string { join(repoRootDir, "scripts", "codex-routing.js"), join(scriptDir, "codex-routing.js"), ); + copyFileSync( + join(repoRootDir, "scripts", "codex-supervisor.js"), + join(scriptDir, "codex-supervisor.js"), + ); return fixtureRoot; } +function writeSupervisorRuntimeFixture(fixtureRoot: string): void { + const distLibDir = join(fixtureRoot, "dist", "lib"); + mkdirSync(distLibDir, { recursive: true }); + writeFileSync( + join(distLibDir, "config.js"), + [ + "export function loadPluginConfig() {", + "\treturn {", + "\t\tcodexCliSessionSupervisor: true,", + "\t\tretryAllAccountsRateLimited: true,", + "\t\tpreemptiveQuotaEnabled: true,", + "\t\tpreemptiveQuotaRemainingPercent5h: 10,", + "\t\tpreemptiveQuotaRemainingPercent7d: 5,", + "\t};", + "}", + ].join("\n"), + "utf8", + ); + writeFileSync( + join(distLibDir, "accounts.js"), + [ + 'import { appendFile } from "node:fs/promises";', + "export class AccountManager {", + "\tstatic async loadFromDisk() {", + '\t\tconst markerPath = process.env.CODEX_TEST_SUPERVISOR_MARKER ?? "";', + "\t\tif (markerPath) {", + '\t\t\tawait appendFile(markerPath, "supervisor\\n", "utf8");', + "\t\t}", + "\t\treturn new AccountManager();", + "\t}", + "\tgetCurrentOrNextForFamilyHybrid() {", + '\t\treturn { index: 0, email: "healthy@example.com" };', + "\t}", + "\tsetActiveIndex() {}", + "\tasync saveToDisk() {}", + "}", + ].join("\n"), + "utf8", + ); + writeFileSync( + join(distLibDir, "quota-probe.js"), + [ + "export async function fetchCodexQuotaSnapshot() {", + "\treturn null;", + "}", + ].join("\n"), + "utf8", + ); + writeFileSync( + join(distLibDir, "storage.js"), + [ + "export function getStoragePath() {", + `\treturn ${JSON.stringify(join(fixtureRoot, "openai-codex-accounts.json"))};`, + "}", + ].join("\n"), + "utf8", + ); +} + +function writeSupervisorStub(fixtureRoot: string, lines: string[]): void { + writeFileSync(join(fixtureRoot, "scripts", "codex-supervisor.js"), lines.join("\n"), "utf8"); +} + +function writeCodexManagerAutoSyncFixture(fixtureRoot: string): void { + const distLibDir = join(fixtureRoot, "dist", "lib"); + mkdirSync(distLibDir, { recursive: true }); + writeFileSync( + join(distLibDir, "codex-manager.js"), + [ + 'import { appendFile } from "node:fs/promises";', + "export async function autoSyncActiveAccountToCodex() {", + '\tconst markerPath = process.env.CODEX_TEST_AUTO_SYNC_MARKER ?? "";', + "\tif (!markerPath) return;", + '\tawait appendFile(markerPath, "sync\\n", "utf8");', + "}", + ].join("\n"), + "utf8", + ); +} + function createFakeCodexBin(rootDir: string): string { const fakeBin = join(rootDir, "fake-codex.js"); writeFileSync( @@ -358,6 +443,10 @@ describe("codex bin wrapper", () => { join(repoRootDir, "scripts", "codex-routing.js"), join(scriptDir, "codex-routing.js"), ); + copyFileSync( + join(repoRootDir, "scripts", "codex-supervisor.js"), + join(scriptDir, "codex-supervisor.js"), + ); writeFileSync( join(globalShimDir, "codex-multi-auth.cmd"), "@ECHO OFF\r\nREM real shim\r\n", @@ -539,4 +628,115 @@ describe("codex bin wrapper", () => { ); } }); + + it("uses the supervisor wrapper for non-interactive commands when enabled", () => { + const fixtureRoot = createWrapperFixture(); + writeSupervisorRuntimeFixture(fixtureRoot); + const fakeBin = createFakeCodexBin(fixtureRoot); + const markerPath = join(fixtureRoot, "supervisor-marker.log"); + + const result = runWrapper(fixtureRoot, ["exec", "status"], { + CODEX_MULTI_AUTH_REAL_CODEX_BIN: fakeBin, + CODEX_AUTH_CLI_SESSION_SUPERVISOR: "1", + CODEX_TEST_SUPERVISOR_MARKER: markerPath, + }); + + expect(result.status).toBe(0); + expect(result.stdout).toContain( + 'FORWARDED:exec status -c cli_auth_credentials_store="file"', + ); + expect(readFileSync(markerPath, "utf8")).toContain("supervisor\n"); + }); + + it("auto-syncs once for a supervisor-forwarded command", () => { + const fixtureRoot = createWrapperFixture(); + writeSupervisorRuntimeFixture(fixtureRoot); + writeCodexManagerAutoSyncFixture(fixtureRoot); + const fakeBin = createFakeCodexBin(fixtureRoot); + const markerPath = join(fixtureRoot, "auto-sync.log"); + + const result = runWrapper(fixtureRoot, ["exec", "status"], { + CODEX_MULTI_AUTH_REAL_CODEX_BIN: fakeBin, + CODEX_AUTH_CLI_SESSION_SUPERVISOR: "1", + CODEX_TEST_AUTO_SYNC_MARKER: markerPath, + }); + + expect(result.status).toBe(0); + expect(result.stdout.match(/FORWARDED:/g) ?? []).toHaveLength(1); + expect(readFileSync(markerPath, "utf8")).toBe("sync\n"); + }); + + it("skips startup auto-sync when the supervisor returns the abort sentinel", () => { + const fixtureRoot = createWrapperFixture(); + writeSupervisorStub(fixtureRoot, [ + "export function isInteractiveCommand() {", + "\treturn true;", + "}", + "export async function runCodexSupervisorIfEnabled() {", + "\treturn 130;", + "}", + ]); + writeCodexManagerAutoSyncFixture(fixtureRoot); + const fakeBin = createFakeCodexBin(fixtureRoot); + const markerPath = join(fixtureRoot, "abort-auto-sync.log"); + + const result = runWrapper(fixtureRoot, ["resume", "session-123"], { + CODEX_MULTI_AUTH_REAL_CODEX_BIN: fakeBin, + CODEX_TEST_AUTO_SYNC_MARKER: markerPath, + }); + + expect(result.status).toBe(130); + expect(result.stdout).not.toContain("FORWARDED:"); + expect(existsSync(markerPath)).toBe(false); + }); + + it("supports interactive commands through the supervisor wrapper", () => { + const fixtureRoot = createWrapperFixture(); + writeSupervisorRuntimeFixture(fixtureRoot); + writeCodexManagerAutoSyncFixture(fixtureRoot); + const fakeBin = createFakeCodexBin(fixtureRoot); + const markerPath = join(fixtureRoot, "interactive-auto-sync.log"); + + const result = runWrapper(fixtureRoot, [], { + CODEX_MULTI_AUTH_REAL_CODEX_BIN: fakeBin, + CODEX_AUTH_CLI_SESSION_SUPERVISOR: "1", + CODEX_TEST_AUTO_SYNC_MARKER: markerPath, + }); + + expect(result.status).toBe(0); + expect(result.stdout).toContain('FORWARDED:-c cli_auth_credentials_store="file"'); + expect(result.stdout.match(/FORWARDED:/g) ?? []).toHaveLength(1); + expect(readFileSync(markerPath, "utf8")).toBe("sync\n"); + }); + + it("avoids double sync when the supervisor forwards an interactive command", () => { + const fixtureRoot = createWrapperFixture(); + writeSupervisorStub(fixtureRoot, [ + "export function isInteractiveCommand(rawArgs) {", + "\treturn Array.isArray(rawArgs) && rawArgs.includes(\"resume\");", + "}", + "export async function runCodexSupervisorIfEnabled({ codexBin, rawArgs, buildForwardArgs, forwardToRealCodex }) {", + "\tawait forwardToRealCodex(codexBin, buildForwardArgs(rawArgs));", + "\treturn 0;", + "}", + ]); + writeCodexManagerAutoSyncFixture(fixtureRoot); + const fakeBin = createFakeCodexBin(fixtureRoot); + const markerPath = join(fixtureRoot, "interactive-option-auto-sync.log"); + + const result = runWrapper( + fixtureRoot, + ["-c", 'profile="dev"', "resume", "session-123"], + { + CODEX_MULTI_AUTH_REAL_CODEX_BIN: fakeBin, + CODEX_TEST_AUTO_SYNC_MARKER: markerPath, + }, + ); + + expect(result.status).toBe(0); + expect(result.stdout).toContain( + 'FORWARDED:-c profile="dev" resume session-123 -c cli_auth_credentials_store="file"', + ); + expect(readFileSync(markerPath, "utf8")).toBe("sync\n"); + }); }); diff --git a/test/codex-supervisor.test.ts b/test/codex-supervisor.test.ts new file mode 100644 index 00000000..4ee38028 --- /dev/null +++ b/test/codex-supervisor.test.ts @@ -0,0 +1,1801 @@ +import { EventEmitter } from "node:events"; +import { mkdtempSync, promises as fs } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { __testOnly as supervisorTestApi } from "../scripts/codex-supervisor.js"; + +const createdDirs: string[] = []; +const envKeys = [ + "CODEX_AUTH_CLI_SESSION_SIGNAL_TIMEOUT_MS", + "CODEX_AUTH_CLI_SESSION_BINDING_POLL_MS", + "CODEX_AUTH_CLI_SESSION_LOCK_POLL_MS", + "CODEX_AUTH_CLI_SESSION_LOCK_WAIT_MS", + "CODEX_AUTH_CLI_SESSION_SUPERVISOR", + "CODEX_AUTH_CLI_SESSION_SUPERVISOR_POLL_MS", + "CODEX_AUTH_CLI_SESSION_SUPERVISOR_IDLE_MS", + "CODEX_AUTH_CLI_SESSION_SNAPSHOT_CACHE_TTL_MS", + "CODEX_AUTH_RETRY_ALL_RATE_LIMITED", + "CODEX_AUTH_PREEMPTIVE_QUOTA_ENABLED", + "CODEX_AUTH_PREEMPTIVE_QUOTA_5H_REMAINING_PCT", + "CODEX_AUTH_PREEMPTIVE_QUOTA_7D_REMAINING_PCT", + "CODEX_HOME", +] as const; +const originalEnv = Object.fromEntries( + envKeys.map((key) => [key, process.env[key]]), +) as Record<(typeof envKeys)[number], string | undefined>; + +async function removeDirectoryWithRetry(dir: string): Promise { + const retryableCodes = new Set(["ENOTEMPTY", "EPERM", "EBUSY"]); + for (let attempt = 1; attempt <= 6; attempt += 1) { + try { + await fs.rm(dir, { recursive: true, force: true }); + return; + } catch (error) { + const code = + error && typeof error === "object" && "code" in error + ? `${error.code ?? ""}` + : ""; + if (!retryableCodes.has(code) || attempt === 6) { + throw error; + } + await new Promise((resolve) => setTimeout(resolve, attempt * 50)); + } + } +} + +function createTempDir(): string { + const dir = mkdtempSync(join(tmpdir(), "codex-supervisor-test-")); + createdDirs.push(dir); + return dir; +} + +function createDeferred() { + let resolve!: (value: T | PromiseLike) => void; + const promise = new Promise((res) => { + resolve = res; + }); + return { promise, resolve }; +} + +class FakeManager { + private accounts: Array<{ + index: number; + accountId: string; + access: string; + email: string; + refreshToken: string; + enabled: boolean; + coolingDownUntil: number; + }>; + + activeIndex = 0; + + constructor( + accounts: Array<{ + accountId: string; + access?: string; + email?: string; + refreshToken?: string; + enabled?: boolean; + coolingDownUntil?: number; + }> = [ + { accountId: "near-limit", access: "token-1" }, + { accountId: "healthy", access: "token-2" }, + ], + ) { + this.accounts = accounts.map((account, index) => ({ + index, + accountId: account.accountId, + access: account.access ?? `token-${index + 1}`, + email: account.email ?? `${account.accountId}@example.com`, + refreshToken: account.refreshToken ?? `rt-${account.accountId}`, + enabled: account.enabled ?? true, + coolingDownUntil: account.coolingDownUntil ?? 0, + })); + } + + getAccountsSnapshot() { + return this.accounts.map((account) => ({ ...account })); + } + + getAccountByIndex(index: number) { + return this.accounts.find((account) => account.index === index) ?? null; + } + + getCurrentAccountForFamily() { + return this.getAccountByIndex(this.activeIndex); + } + + getCurrentOrNextForFamilyHybrid() { + const now = Date.now(); + const ordered = [ + this.getCurrentAccountForFamily(), + ...this.accounts.filter((account) => account.index !== this.activeIndex), + ].filter(Boolean); + return ( + ordered.find( + (account) => + account.enabled !== false && account.coolingDownUntil <= now, + ) ?? null + ); + } + + getMinWaitTimeForFamily() { + const now = Date.now(); + const waits = this.accounts + .map((account) => Math.max(0, account.coolingDownUntil - now)) + .filter((waitMs) => waitMs > 0); + return waits.length > 0 ? Math.min(...waits) : 0; + } + + markRateLimitedWithReason( + account: { index: number }, + waitMs: number, + ) { + const target = this.getAccountByIndex(account.index); + if (!target) return; + target.coolingDownUntil = Date.now() + Math.max(waitMs, 1); + } + + markAccountCoolingDown( + account: { index: number }, + waitMs: number, + ) { + this.markRateLimitedWithReason(account, waitMs); + } + + setActiveIndex(index: number) { + this.activeIndex = index; + } + + async syncCodexCliActiveSelectionForIndex() {} + + async saveToDisk() {} +} + +function createFakeRuntime( + manager: FakeManager, + options: { + quotaProbeDelayMs?: number; + snapshots?: Map< + string, + { + status: number; + primary?: { usedPercent?: number }; + secondary?: { usedPercent?: number }; + } + >; + delayByAccountId?: Map; + waitForFetchByAccountId?: Map>; + onFetch?: (accountId: string) => void; + onFetchStart?: (accountId: string) => void; + } = {}, +) { + const storageDir = createTempDir(); + const snapshots = + options.snapshots ?? + new Map([ + [ + "near-limit", + { + status: 200, + primary: { usedPercent: 91 }, + secondary: { usedPercent: 12 }, + }, + ], + [ + "healthy", + { + status: 200, + primary: { usedPercent: 25 }, + secondary: { usedPercent: 8 }, + }, + ], + ]); + const fallbackProbeDelayMs = options.quotaProbeDelayMs ?? 0; + const delayByAccountId = options.delayByAccountId ?? new Map(); + const waitForFetchByAccountId = options.waitForFetchByAccountId ?? new Map(); + + return { + AccountManager: { + async loadFromDisk() { + return manager; + }, + }, + getStoragePath() { + return join(storageDir, "accounts.json"); + }, + getPreemptiveQuotaEnabled() { + return true; + }, + getPreemptiveQuotaRemainingPercent5h() { + return 10; + }, + getPreemptiveQuotaRemainingPercent7d() { + return 5; + }, + getRetryAllAccountsRateLimited() { + return true; + }, + async fetchCodexQuotaSnapshot({ + accountId, + signal, + }: { + accountId: string; + signal?: AbortSignal; + }) { + options.onFetch?.(accountId); + options.onFetchStart?.(accountId); + const gate = waitForFetchByAccountId.get(accountId); + if (gate) { + await gate; + } + const quotaProbeDelayMs = + delayByAccountId.get(accountId) ?? fallbackProbeDelayMs; + await new Promise((resolve, reject) => { + const timer = setTimeout(() => { + signal?.removeEventListener("abort", onAbort); + resolve(); + }, quotaProbeDelayMs); + const onAbort = () => { + clearTimeout(timer); + const error = new Error("Quota probe aborted"); + error.name = "AbortError"; + reject(error); + }; + signal?.addEventListener("abort", onAbort, { once: true }); + }); + return snapshots.get(accountId) ?? null; + }, + }; +} + +afterEach(async () => { + vi.useRealTimers(); + supervisorTestApi?.clearAllProbeSnapshotCache?.(); + supervisorTestApi?.clearSessionBindingPathCache?.(); + for (const key of envKeys) { + const value = originalEnv[key]; + if (value === undefined) { + delete process.env[key]; + continue; + } + process.env[key] = value; + } + for (const dir of createdDirs.splice(0, createdDirs.length).reverse()) { + await removeDirectoryWithRetry(dir); + } +}); + +describe("codex supervisor", () => { + it("finds session metadata when it lands on the 200th non-empty line", async () => { + expect(supervisorTestApi).toBeDefined(); + const dir = createTempDir(); + const filePath = join(dir, "boundary.jsonl"); + const preamble = Array.from({ length: 199 }, (_unused, index) => + JSON.stringify({ type: "event", seq: index + 1 }), + ); + await fs.writeFile( + filePath, + [ + ...preamble, + JSON.stringify({ + session_meta: { + payload: { id: "boundary-session", cwd: dir }, + }, + }), + ].join("\n"), + "utf8", + ); + + await expect(supervisorTestApi?.extractSessionMeta(filePath)).resolves.toEqual({ + sessionId: "boundary-session", + cwd: dir, + }); + }); + + it("misses session metadata beyond the 200-line scan limit", async () => { + const dir = createTempDir(); + const filePath = join(dir, "over-limit.jsonl"); + const preamble = Array.from({ length: 200 }, (_unused, index) => + JSON.stringify({ type: "event", seq: index + 1 }), + ); + await fs.writeFile( + filePath, + [ + ...preamble, + JSON.stringify({ + session_meta: { + payload: { id: "missed-session", cwd: dir }, + }, + }), + ].join("\n"), + "utf8", + ); + + await expect(supervisorTestApi?.extractSessionMeta(filePath)).resolves.toBeNull(); + }); + + it("reuses the cached rollout path for a known session before scanning the sessions tree again", async () => { + const codexHome = createTempDir(); + const cwd = createTempDir(); + process.env.CODEX_HOME = codexHome; + + const sessionsDir = join(codexHome, "sessions", "2026", "03", "20"); + await fs.mkdir(sessionsDir, { recursive: true }); + const rolloutPath = join(sessionsDir, "known-session.jsonl"); + await fs.writeFile( + rolloutPath, + JSON.stringify({ + session_meta: { + payload: { id: "known-session", cwd }, + }, + }), + "utf8", + ); + + const first = await supervisorTestApi?.findSessionBinding({ + cwd, + sinceMs: 0, + sessionId: "known-session", + }); + expect(first).toMatchObject({ + sessionId: "known-session", + rolloutPath, + }); + + const readdirSpy = vi.spyOn(fs, "readdir"); + readdirSpy.mockRejectedValue(new Error("should not rescan sessions")); + + try { + const second = await supervisorTestApi?.findSessionBinding({ + cwd, + sinceMs: 0, + sessionId: "known-session", + }); + expect(second).toMatchObject({ + sessionId: "known-session", + rolloutPath, + }); + expect(readdirSpy).not.toHaveBeenCalled(); + } finally { + readdirSpy.mockRestore(); + } + }); + + it("parses option-prefixed interactive commands consistently", () => { + expect( + supervisorTestApi?.isInteractiveCommand([ + "-c", + 'profile="dev"', + "resume", + "session-123", + ]), + ).toBe(true); + expect( + supervisorTestApi?.readResumeSessionId([ + "--config", + 'env="dev"', + "resume", + "session-123", + ]), + ).toBe("session-123"); + expect( + supervisorTestApi?.isInteractiveCommand([ + "--config=env=\"dev\"", + "fork", + ]), + ).toBe(true); + }); + + it("caches the session file listing across binding wait polls", async () => { + const codexHome = createTempDir(); + const cwd = createTempDir(); + process.env.CODEX_HOME = codexHome; + process.env.CODEX_AUTH_CLI_SESSION_BINDING_POLL_MS = "5"; + + const sessionsDir = join(codexHome, "sessions", "2026", "03", "20"); + await fs.mkdir(sessionsDir, { recursive: true }); + await fs.writeFile( + join(sessionsDir, "no-binding.jsonl"), + JSON.stringify({ type: "event", seq: 1 }), + "utf8", + ); + + const readdirSpy = vi.spyOn(fs, "readdir"); + try { + const binding = await supervisorTestApi?.waitForSessionBinding({ + cwd, + sinceMs: Date.now(), + sessionId: "missing-session", + rolloutPathHint: null, + timeoutMs: 40, + signal: undefined, + }); + expect(binding).toBeNull(); + expect(readdirSpy.mock.calls.length).toBeLessThan(10); + } finally { + readdirSpy.mockRestore(); + } + }); + + it("tries recent session entries before falling back to the full scan for known session ids", () => { + const recentEntry = { filePath: "recent.jsonl", mtimeMs: 5_000 }; + const staleEntry = { filePath: "stale.jsonl", mtimeMs: 100 }; + + expect( + supervisorTestApi?.getSessionBindingEntryPasses( + [staleEntry, recentEntry], + 4_000, + "known-session", + false, + ), + ).toEqual([[recentEntry], [recentEntry, staleEntry]]); + expect( + supervisorTestApi?.getSessionBindingEntryPasses( + [staleEntry, recentEntry], + 4_000, + "known-session", + true, + ), + ).toEqual([[recentEntry]]); + }); + + it("interrupts child restart waits when the abort signal fires", async () => { + vi.useFakeTimers(); + class FakeChild extends EventEmitter { + exitCode: number | null = null; + kill = vi.fn((_signal: string) => true); + } + + const child = new FakeChild(); + const controller = new AbortController(); + const pending = supervisorTestApi?.requestChildRestart( + child, + "win32", + controller.signal, + ); + + controller.abort(); + await vi.runAllTimersAsync(); + await expect(pending).resolves.toBeUndefined(); + expect(child.kill).toHaveBeenCalledWith("SIGTERM"); + expect(child.kill).toHaveBeenCalledWith("SIGKILL"); + }); + + it("sends SIGINT before escalating on non-Windows platforms", async () => { + vi.useFakeTimers(); + class FakeChild extends EventEmitter { + exitCode: number | null = null; + kill = vi.fn((_signal: string) => true); + } + + const child = new FakeChild(); + const pending = supervisorTestApi?.requestChildRestart(child, "linux"); + + await vi.runAllTimersAsync(); + await expect(pending).resolves.toBeUndefined(); + expect(child.kill.mock.calls.map(([signal]) => signal)).toEqual([ + "SIGINT", + "SIGTERM", + "SIGKILL", + ]); + }); + + it("uses stable snapshot cache keys without embedding refresh tokens", () => { + const cacheKey = supervisorTestApi?.getSnapshotCacheKey({ + index: 2, + accountId: "healthy", + email: "healthy@example.com", + refreshToken: "super-secret-refresh-token", + }); + expect(cacheKey).toBe("healthy|healthy@example.com|2"); + expect(cacheKey).not.toContain("super-secret-refresh-token"); + }); + + it("honors env overrides in runtime accessor fallbacks", () => { + process.env.CODEX_AUTH_CLI_SESSION_SUPERVISOR = "1"; + process.env.CODEX_AUTH_RETRY_ALL_RATE_LIMITED = "0"; + process.env.CODEX_AUTH_PREEMPTIVE_QUOTA_ENABLED = "0"; + process.env.CODEX_AUTH_PREEMPTIVE_QUOTA_5H_REMAINING_PCT = "17"; + process.env.CODEX_AUTH_PREEMPTIVE_QUOTA_7D_REMAINING_PCT = "23"; + + const accessors = supervisorTestApi?.createRuntimeConfigAccessors({}); + expect(accessors).toBeTruthy(); + expect( + accessors?.getCodexCliSessionSupervisor({ + codexCliSessionSupervisor: false, + }), + ).toBe(true); + expect( + accessors?.getRetryAllAccountsRateLimited({ + retryAllAccountsRateLimited: true, + }), + ).toBe(false); + expect( + accessors?.getPreemptiveQuotaEnabled({ + preemptiveQuotaEnabled: true, + }), + ).toBe(false); + expect( + accessors?.getPreemptiveQuotaRemainingPercent5h({ + preemptiveQuotaRemainingPercent5h: 5, + }), + ).toBe(17); + expect( + accessors?.getPreemptiveQuotaRemainingPercent7d({ + preemptiveQuotaRemainingPercent7d: 5, + }), + ).toBe(23); + }); + + it("cleans up stale supervisor locks even after a transient Windows unlink failure", async () => { + const manager = new FakeManager(); + const runtime = createFakeRuntime(manager); + const lockPath = supervisorTestApi?.getSupervisorStorageLockPath(runtime); + expect(lockPath).toBeTruthy(); + if (!lockPath) { + throw new Error("expected a supervisor lock path"); + } + + await fs.mkdir(join(lockPath, ".."), { recursive: true }).catch(() => {}); + await fs.writeFile( + lockPath, + JSON.stringify({ + pid: 1, + acquiredAt: Date.now() - 60_000, + expiresAt: Date.now() - 1_000, + }), + "utf8", + ); + + const originalUnlink = fs.unlink.bind(fs); + const unlinkSpy = vi + .spyOn(fs, "unlink") + .mockImplementationOnce(async () => { + const error = Object.assign(new Error("file busy"), { code: "EPERM" }); + throw error; + }) + .mockImplementation(originalUnlink); + + try { + await expect( + supervisorTestApi?.withLockedManager(runtime, async (loadedManager: FakeManager) => { + expect(loadedManager).toBe(manager); + return "locked"; + }), + ).resolves.toBe("locked"); + expect(unlinkSpy.mock.calls.length).toBeGreaterThanOrEqual(2); + } finally { + unlinkSpy.mockRestore(); + } + }); + + it("refuses to delete a lock when the owner changes before cleanup", async () => { + const dir = createTempDir(); + const lockPath = join(dir, "openai-codex-accounts.json.supervisor.lock"); + await fs.writeFile( + lockPath, + JSON.stringify({ + ownerId: "new-owner", + pid: 2, + acquiredAt: Date.now(), + expiresAt: Date.now() + 60_000, + }), + "utf8", + ); + + await expect( + supervisorTestApi?.safeUnlinkOwnedSupervisorLock(lockPath, "old-owner"), + ).resolves.toBe(false); + await expect(fs.readFile(lockPath, "utf8")).resolves.toContain("new-owner"); + }); + + it.each(["EPERM", "EBUSY"] as const)( + "retries supervisor lock creation after a transient Windows %s", + async (code) => { + process.env.CODEX_AUTH_CLI_SESSION_LOCK_WAIT_MS = "1000"; + process.env.CODEX_AUTH_CLI_SESSION_LOCK_POLL_MS = "10"; + + const manager = new FakeManager(); + const runtime = createFakeRuntime(manager); + const lockPath = supervisorTestApi?.getSupervisorStorageLockPath(runtime); + expect(lockPath).toBeTruthy(); + if (!lockPath) { + throw new Error("expected a supervisor lock path"); + } + + const originalOpen = fs.open.bind(fs); + let injectedFailure = false; + const openSpy = vi.spyOn(fs, "open").mockImplementation(async (path, flags, ...rest) => { + if (!injectedFailure && `${path}` === lockPath && flags === "wx") { + injectedFailure = true; + const error = Object.assign(new Error("transient lock create failure"), { + code, + }); + throw error; + } + return originalOpen( + path as Parameters[0], + flags as Parameters[1], + ...(rest as Parameters extends [unknown, unknown, ...infer Tail] + ? Tail + : never), + ); + }); + + try { + await expect( + supervisorTestApi?.withLockedManager(runtime, async (loadedManager: FakeManager) => { + expect(loadedManager).toBe(manager); + return "locked"; + }), + ).resolves.toBe("locked"); + expect(injectedFailure).toBe(true); + expect( + openSpy.mock.calls.filter( + ([path, flags]) => `${path}` === lockPath && flags === "wx", + ).length, + ).toBeGreaterThanOrEqual(2); + } finally { + openSpy.mockRestore(); + } + }, + ); + + it("serializes concurrent callers behind the supervisor storage lock", async () => { + process.env.CODEX_AUTH_CLI_SESSION_LOCK_WAIT_MS = "1000"; + process.env.CODEX_AUTH_CLI_SESSION_LOCK_POLL_MS = "10"; + + const manager = new FakeManager(); + const runtime = createFakeRuntime(manager); + const order: string[] = []; + let releaseFirst: (() => void) | null = null; + let resolveFirstEntered: (() => void) | null = null; + let resolveSecondEntered: (() => void) | null = null; + const firstEntered = new Promise((resolve) => { + resolveFirstEntered = resolve; + }); + const secondEntered = new Promise((resolve) => { + resolveSecondEntered = resolve; + }); + let secondHasLock = false; + + const first = supervisorTestApi?.withLockedManager( + runtime, + async (loadedManager: FakeManager) => { + expect(loadedManager).toBe(manager); + order.push("first-enter"); + resolveFirstEntered?.(); + await new Promise((resolve) => { + releaseFirst = resolve; + }); + order.push("first-exit"); + return "first"; + }, + ); + + await firstEntered; + + const second = supervisorTestApi?.withLockedManager( + runtime, + async (loadedManager: FakeManager) => { + expect(loadedManager).toBe(manager); + secondHasLock = true; + order.push("second-enter"); + resolveSecondEntered?.(); + return "second"; + }, + ); + + await supervisorTestApi?.sleep(40); + expect(secondHasLock).toBe(false); + releaseFirst?.(); + + await secondEntered; + await expect(Promise.all([first, second])).resolves.toEqual([ + "first", + "second", + ]); + expect(order).toEqual(["first-enter", "first-exit", "second-enter"]); + }); + + it("skips a near-limit current account and selects the next healthy account", async () => { + const manager = new FakeManager(); + const runtime = createFakeRuntime(manager); + + const result = await supervisorTestApi?.ensureLaunchableAccount( + runtime, + {}, + undefined, + { probeTimeoutMs: 250 }, + ); + + expect(result?.ok).toBe(true); + expect(result?.account?.accountId).toBe("healthy"); + expect(manager.activeIndex).toBe(1); + expect(manager.getCurrentAccountForFamily()?.accountId).toBe("healthy"); + expect(manager.getAccountByIndex(0)?.coolingDownUntil ?? 0).toBeGreaterThan( + Date.now() - 1, + ); + }); + + it("starts prewarm before the rotate threshold without forcing a cutover", async () => { + const manager = new FakeManager(); + const runtime = createFakeRuntime(manager, { + snapshots: new Map([ + [ + "near-limit", + { + status: 200, + primary: { usedPercent: 86 }, + secondary: { usedPercent: 12 }, + }, + ], + [ + "healthy", + { + status: 200, + primary: { usedPercent: 25 }, + secondary: { usedPercent: 8 }, + }, + ], + ]), + }); + + const snapshot = await supervisorTestApi?.probeAccountSnapshot( + runtime, + manager.getCurrentAccountForFamily(), + undefined, + 250, + { useCache: false }, + ); + const pressure = supervisorTestApi?.computeQuotaPressure(snapshot, runtime, {}); + const prepared = await supervisorTestApi?.prepareResumeSelection({ + runtime, + pluginConfig: {}, + currentAccount: manager.getCurrentAccountForFamily(), + signal: undefined, + }); + + expect(pressure).toMatchObject({ + prewarm: true, + rotate: false, + remaining5h: 14, + }); + expect(manager.activeIndex).toBe(0); + expect(prepared?.nextReady).toMatchObject({ + ok: true, + account: { accountId: "healthy" }, + }); + }); + + it("aborts prepared prewarm selection when the session exits without rotating", async () => { + class FakeChild extends EventEmitter { + exitCode: number | null = null; + + constructor(exitCode: number) { + super(); + setTimeout(() => { + this.exitCode = exitCode; + this.emit("exit", exitCode, null); + }, 25); + } + + kill(_signal: string) { + return true; + } + } + + const manager = new FakeManager(); + const storageDir = createTempDir(); + let preparedProbeSignal: AbortSignal | undefined; + const runtime = { + AccountManager: { + async loadFromDisk() { + return manager; + }, + }, + getStoragePath() { + return join(storageDir, "accounts.json"); + }, + getPreemptiveQuotaEnabled() { + return true; + }, + getPreemptiveQuotaRemainingPercent5h() { + return 10; + }, + getPreemptiveQuotaRemainingPercent7d() { + return 5; + }, + getRetryAllAccountsRateLimited() { + return true; + }, + async fetchCodexQuotaSnapshot({ + accountId, + signal, + }: { + accountId: string; + signal?: AbortSignal; + }) { + if (accountId === "near-limit") { + return { + status: 200, + primary: { usedPercent: 86 }, + secondary: { usedPercent: 12 }, + }; + } + preparedProbeSignal = signal; + return await new Promise((_resolve, reject) => { + const onAbort = () => { + const error = new Error("Quota probe aborted"); + error.name = "AbortError"; + reject(error); + }; + signal?.addEventListener("abort", onAbort, { once: true }); + }); + }, + }; + + const result = await supervisorTestApi?.runInteractiveSupervision({ + codexBin: "dist/bin/codex.js", + initialArgs: ["resume", "prewarm-clean-exit"], + buildForwardArgs: (rawArgs: string[]) => [...rawArgs], + runtime, + pluginConfig: {}, + manager, + signal: undefined, + maxSessionRestarts: 1, + spawnChild: () => new FakeChild(0), + findBinding: async ({ sessionId }: { sessionId?: string }) => ({ + sessionId: sessionId ?? "prewarm-clean-exit", + rolloutPath: null, + lastActivityAtMs: Date.now(), + }), + }); + + expect(result).toBe(0); + expect(preparedProbeSignal?.aborted).toBe(true); + }); + + it("reuses a cached healthy snapshot within the short ttl", async () => { + process.env.CODEX_AUTH_CLI_SESSION_SNAPSHOT_CACHE_TTL_MS = "5000"; + const manager = new FakeManager(); + const calls: string[] = []; + const runtime = createFakeRuntime(manager, { + onFetch(accountId) { + calls.push(accountId); + }, + }); + const account = manager.getCurrentAccountForFamily(); + + await supervisorTestApi?.clearProbeSnapshotCache(account); + const first = await supervisorTestApi?.probeAccountSnapshot( + runtime, + account, + undefined, + 250, + ); + const second = await supervisorTestApi?.probeAccountSnapshot( + runtime, + account, + undefined, + 250, + ); + + expect(first).toEqual(second); + expect(calls).toEqual(["near-limit"]); + }); + + it("shares the same in-flight probe across concurrent callers", async () => { + process.env.CODEX_AUTH_CLI_SESSION_SNAPSHOT_CACHE_TTL_MS = "5000"; + const manager = new FakeManager(); + const calls: string[] = []; + const nearLimitGate = createDeferred(); + const probeStarted = createDeferred(); + const runtime = createFakeRuntime(manager, { + waitForFetchByAccountId: new Map([["near-limit", nearLimitGate.promise]]), + onFetch(accountId) { + calls.push(accountId); + }, + onFetchStart(accountId) { + if (accountId === "near-limit") { + probeStarted.resolve(); + } + }, + }); + const account = manager.getCurrentAccountForFamily(); + + await supervisorTestApi?.clearProbeSnapshotCache(account); + const first = supervisorTestApi?.probeAccountSnapshot( + runtime, + account, + undefined, + 250, + ); + await probeStarted.promise; + const second = supervisorTestApi?.probeAccountSnapshot( + runtime, + account, + undefined, + 250, + ); + expect(calls).toEqual(["near-limit"]); + + nearLimitGate.resolve(); + const [firstSnapshot, secondSnapshot] = await Promise.all([first, second]); + expect(firstSnapshot).toEqual(secondSnapshot); + expect(calls).toEqual(["near-limit"]); + }); + + it("starts selection probing before restart finishes in overlap mode", async () => { + process.env.CODEX_AUTH_CLI_SESSION_SIGNAL_TIMEOUT_MS = "40"; + + class FakeChild extends EventEmitter { + exitCode: number | null = null; + kill = vi.fn((_signal: string) => true); + } + + const overlapManager = new FakeManager(); + const nearLimitGate = createDeferred(); + const probeStarted = createDeferred(); + const overlapRuntime = createFakeRuntime(overlapManager, { + waitForFetchByAccountId: new Map([["near-limit", nearLimitGate.promise]]), + onFetchStart(accountId) { + if (accountId === "near-limit") { + probeStarted.resolve(); + } + }, + }); + const overlapChild = new FakeChild(); + let restartFinished = false; + const restartPromise = supervisorTestApi?.requestChildRestart( + overlapChild, + "win32", + ).then(() => { + restartFinished = true; + }); + const selectionPromise = supervisorTestApi?.ensureLaunchableAccount( + overlapRuntime, + {}, + undefined, + { probeTimeoutMs: 250 }, + ); + await probeStarted.promise; + expect(restartFinished).toBe(false); + nearLimitGate.resolve(); + + await expect(Promise.all([ + restartPromise, + selectionPromise, + ])).resolves.toEqual([ + undefined, + expect.objectContaining({ + ok: true, + account: expect.objectContaining({ accountId: "healthy" }), + }), + ]); + }); + + it("uses the prepared account without re-probing at cutover time", async () => { + process.env.CODEX_AUTH_CLI_SESSION_SIGNAL_TIMEOUT_MS = "40"; + + class FakeChild extends EventEmitter { + exitCode: number | null = null; + kill = vi.fn((_signal: string) => true); + } + + const calls: string[] = []; + const manager = new FakeManager(); + const runtime = createFakeRuntime(manager, { + quotaProbeDelayMs: 80, + onFetch(accountId) { + calls.push(accountId); + }, + }); + + const prepared = await supervisorTestApi?.prepareResumeSelection({ + runtime, + pluginConfig: {}, + currentAccount: manager.getCurrentAccountForFamily(), + restartDecision: { + reason: "quota-near-exhaustion", + waitMs: 0, + sessionId: "prepared-session", + }, + signal: undefined, + }); + expect(prepared?.nextReady).toMatchObject({ + ok: true, + account: { accountId: "healthy" }, + }); + expect(calls).toEqual(["healthy"]); + + calls.length = 0; + await supervisorTestApi?.markCurrentAccountForRestart( + runtime, + manager.getCurrentAccountForFamily(), + { + reason: "quota-near-exhaustion", + waitMs: 0, + sessionId: "prepared-session", + }, + undefined, + ); + await supervisorTestApi?.requestChildRestart(new FakeChild(), "win32"); + const committed = await supervisorTestApi?.commitPreparedSelection( + runtime, + prepared?.nextReady?.account, + undefined, + ); + expect(committed).toMatchObject({ + ok: true, + account: { accountId: "healthy" }, + }); + expect(calls).toEqual([]); + }); + + it("commits the prepared account after the stored token refreshes before cutover", async () => { + process.env.CODEX_AUTH_CLI_SESSION_SIGNAL_TIMEOUT_MS = "40"; + + class FakeChild extends EventEmitter { + exitCode: number | null = null; + kill = vi.fn((_signal: string) => { + setTimeout(() => { + this.exitCode = 0; + this.emit("exit", 0, null); + }, 0); + return true; + }); + } + + const calls: string[] = []; + const manager = new FakeManager(); + const runtime = createFakeRuntime(manager, { + onFetch(accountId) { + calls.push(accountId); + }, + }); + + const prepared = await supervisorTestApi?.prepareResumeSelection({ + runtime, + pluginConfig: {}, + currentAccount: manager.getCurrentAccountForFamily(), + restartDecision: { + reason: "quota-near-exhaustion", + waitMs: 0, + sessionId: "prepared-session", + }, + signal: undefined, + }); + const stalePreparedAccount = prepared?.nextReady?.account + ? { ...prepared.nextReady.account } + : null; + expect(stalePreparedAccount).toMatchObject({ + accountId: "healthy", + refreshToken: "rt-healthy", + }); + expect(calls).toEqual(["healthy"]); + + await supervisorTestApi?.markCurrentAccountForRestart( + runtime, + manager.getCurrentAccountForFamily(), + { + reason: "quota-near-exhaustion", + waitMs: 0, + sessionId: "prepared-session", + }, + undefined, + ); + const refreshedStoredAccount = manager.getAccountByIndex(1); + expect(refreshedStoredAccount).not.toBeNull(); + if (!refreshedStoredAccount) { + return; + } + refreshedStoredAccount.refreshToken = "rt-healthy-refreshed"; + refreshedStoredAccount.access = "token-2-refreshed"; + await supervisorTestApi?.requestChildRestart(new FakeChild(), "win32"); + + const committed = await supervisorTestApi?.commitPreparedSelection( + runtime, + stalePreparedAccount, + undefined, + ); + + expect(committed).toMatchObject({ + ok: true, + account: { + accountId: "healthy", + refreshToken: "rt-healthy-refreshed", + access: "token-2-refreshed", + }, + }); + expect(calls).toEqual(["healthy"]); + }); + + it("commits the prepared account only at cutover time", async () => { + const manager = new FakeManager(); + const runtime = createFakeRuntime(manager, { + quotaProbeDelayMs: 40, + }); + + const prepared = await supervisorTestApi?.prepareResumeSelection({ + runtime, + pluginConfig: {}, + currentAccount: manager.getCurrentAccountForFamily(), + signal: undefined, + }); + expect(manager.activeIndex).toBe(0); + await supervisorTestApi?.markCurrentAccountForRestart( + runtime, + manager.getCurrentAccountForFamily(), + { + reason: "quota-near-exhaustion", + waitMs: 0, + sessionId: "prepared-session", + }, + undefined, + ); + + const committed = await supervisorTestApi?.commitPreparedSelection( + runtime, + prepared?.nextReady?.account, + undefined, + ); + + expect(committed).toMatchObject({ + ok: true, + account: { accountId: "healthy" }, + }); + expect(manager.activeIndex).toBe(1); + }); + + it("preserves caller CLI options when rebuilding resume args after rotation", async () => { + class FakeChild extends EventEmitter { + exitCode: number | null = null; + + constructor(exitCode: number) { + super(); + setTimeout(() => { + this.exitCode = exitCode; + this.emit("exit", exitCode, null); + }, 0); + } + + kill(_signal: string) { + return true; + } + } + + const manager = new FakeManager(); + const runtime = createFakeRuntime(manager); + const spawnedArgs: string[][] = []; + const exitCodes = [1, 0]; + + const result = await supervisorTestApi?.runInteractiveSupervision({ + codexBin: "dist/bin/codex.js", + initialArgs: [ + "-c", + 'profile="dev"', + "resume", + "seed-session", + "-c", + 'cli_auth_credentials_store="file"', + ], + buildForwardArgs: (rawArgs: string[]) => [...rawArgs], + runtime, + pluginConfig: {}, + manager, + signal: undefined, + maxSessionRestarts: 2, + spawnChild: (_codexBin: string, args: string[]) => { + spawnedArgs.push([...args]); + return new FakeChild(exitCodes.shift() ?? 0); + }, + findBinding: async ({ sessionId }: { sessionId?: string }) => + sessionId + ? { + sessionId, + rolloutPath: null, + lastActivityAtMs: Date.now(), + } + : null, + }); + + expect(result).toBe(0); + expect(spawnedArgs).toEqual([ + [ + "-c", + 'profile="dev"', + "resume", + "seed-session", + "-c", + 'cli_auth_credentials_store="file"', + ], + [ + "-c", + 'profile="dev"', + "resume", + "seed-session", + "-c", + 'cli_auth_credentials_store="file"', + ], + ]); + }); + + it("starts degraded candidate probes in the same batch before waiting on results", async () => { + const manager = new FakeManager([ + { accountId: "degraded-1", access: "token-1" }, + { accountId: "degraded-2", access: "token-2" }, + { accountId: "degraded-3", access: "token-3" }, + { accountId: "healthy", access: "token-4" }, + ]); + const degradedProbeGate = createDeferred(); + const firstProbeStarted = createDeferred(); + const startedAccounts = new Set(); + const runtime = createFakeRuntime(manager, { + quotaProbeDelayMs: 70, + waitForFetchByAccountId: new Map([ + ["degraded-1", degradedProbeGate.promise], + ["degraded-2", degradedProbeGate.promise], + ["degraded-3", degradedProbeGate.promise], + ]), + onFetchStart(accountId) { + if (accountId.startsWith("degraded-")) { + startedAccounts.add(accountId); + if (startedAccounts.size === 1) { + firstProbeStarted.resolve(); + } + } + }, + snapshots: new Map([ + [ + "degraded-1", + { status: 200, primary: { usedPercent: 93 }, secondary: { usedPercent: 12 } }, + ], + [ + "degraded-2", + { status: 200, primary: { usedPercent: 94 }, secondary: { usedPercent: 14 } }, + ], + [ + "degraded-3", + { status: 200, primary: { usedPercent: 95 }, secondary: { usedPercent: 11 } }, + ], + [ + "healthy", + { status: 200, primary: { usedPercent: 18 }, secondary: { usedPercent: 7 } }, + ], + ]), + }); + + const pendingResult = supervisorTestApi?.ensureLaunchableAccount( + runtime, + {}, + undefined, + { probeTimeoutMs: 250 }, + ); + await firstProbeStarted.promise; + for (let attempt = 0; attempt < 6 && startedAccounts.size < 3; attempt += 1) { + await new Promise((resolve) => setImmediate(resolve)); + } + expect([...startedAccounts].sort()).toEqual([ + "degraded-1", + "degraded-2", + "degraded-3", + ]); + degradedProbeGate.resolve(); + const result = await pendingResult; + + expect(result).toMatchObject({ + ok: true, + account: { accountId: "healthy" }, + }); + expect(manager.activeIndex).toBe(3); + }); + + it("bypasses supervisor account gating for auth commands before account selection", async () => { + const loadFromDisk = vi.fn(async () => { + throw new Error("ensureLaunchableAccount should not run for bypass commands"); + }); + const forwardToRealCodex = vi.fn(async () => 0); + + await expect( + supervisorTestApi?.runCodexSupervisorWithRuntime({ + codexBin: "dist/bin/codex.js", + rawArgs: ["auth"], + buildForwardArgs: (rawArgs: string[]) => [...rawArgs], + forwardToRealCodex, + runtime: { + loadPluginConfig: () => ({ codexCliSessionSupervisor: true }), + getCodexCliSessionSupervisor: () => true, + AccountManager: { loadFromDisk }, + }, + signal: undefined, + }), + ).resolves.toBe(0); + expect(forwardToRealCodex).toHaveBeenCalledWith("dist/bin/codex.js", [ + "auth", + ]); + expect(loadFromDisk).not.toHaveBeenCalled(); + }); + + it.each([ + ["--help"], + ["--version"], + ])( + "bypasses supervisor account gating for top-level %s flags before account selection", + async (flag) => { + const loadFromDisk = vi.fn(async () => { + throw new Error("ensureLaunchableAccount should not run for top-level help/version"); + }); + const forwardToRealCodex = vi.fn(async () => 0); + + await expect( + supervisorTestApi?.runCodexSupervisorWithRuntime({ + codexBin: "dist/bin/codex.js", + rawArgs: [flag], + buildForwardArgs: (rawArgs: string[]) => [...rawArgs], + forwardToRealCodex, + runtime: { + loadPluginConfig: () => ({ codexCliSessionSupervisor: true }), + getCodexCliSessionSupervisor: () => true, + AccountManager: { loadFromDisk }, + }, + signal: undefined, + }), + ).resolves.toBe(0); + expect(forwardToRealCodex).toHaveBeenCalledWith("dist/bin/codex.js", [flag]); + expect(loadFromDisk).not.toHaveBeenCalled(); + }, + ); + + it("does not bypass supervisor account gating for nested version flags after --", async () => { + const loadFromDisk = vi.fn(async () => { + throw new Error("ensureLaunchableAccount reached"); + }); + const forwardToRealCodex = vi.fn(async () => 0); + + await expect( + supervisorTestApi?.runCodexSupervisorWithRuntime({ + codexBin: "dist/bin/codex.js", + rawArgs: ["exec", "--", "--version"], + buildForwardArgs: (rawArgs: string[]) => [...rawArgs], + forwardToRealCodex, + runtime: { + loadPluginConfig: () => ({ codexCliSessionSupervisor: true }), + getCodexCliSessionSupervisor: () => true, + AccountManager: { loadFromDisk }, + }, + signal: undefined, + }), + ).rejects.toThrow("ensureLaunchableAccount reached"); + expect(forwardToRealCodex).not.toHaveBeenCalled(); + }); + + it( + "returns 1 when interactive supervision is already at the restart safety limit", + async () => { + const manager = new FakeManager([ + { accountId: "near-limit", access: "token-1" }, + { accountId: "healthy", access: "token-2" }, + ]); + const runtime = createFakeRuntime(manager); + const spawnChild = vi.fn(); + + const result = await supervisorTestApi?.runInteractiveSupervision({ + codexBin: "dist/bin/codex.js", + initialArgs: ["resume", "session-restart-limit"], + buildForwardArgs: (rawArgs: string[]) => [...rawArgs], + runtime, + pluginConfig: {}, + manager, + signal: undefined, + maxSessionRestarts: 0, + spawnChild, + }); + + expect(result).toBe(1); + expect(spawnChild).not.toHaveBeenCalled(); + }, + ); + + it( + "stops waiting on the child when the outer signal aborts", + { timeout: 10_000 }, + async () => { + process.env.CODEX_AUTH_CLI_SESSION_SIGNAL_TIMEOUT_MS = "10"; + + class HangingChild extends EventEmitter { + exitCode: number | null = null; + killSignals: string[] = []; + kill = vi.fn((signal: string) => { + this.killSignals.push(signal); + setTimeout(() => { + this.exitCode = 130; + this.emit("exit", 130, signal); + }, 0); + return true; + }); + } + + const manager = new FakeManager(); + const runtime = { + ...createFakeRuntime(manager), + getPreemptiveQuotaEnabled() { + return false; + }, + }; + const controller = new AbortController(); + const child = new HangingChild(); + const runPromise = supervisorTestApi?.runInteractiveSupervision({ + codexBin: "dist/bin/codex.js", + initialArgs: ["chat"], + buildForwardArgs: (rawArgs: string[]) => [...rawArgs], + runtime, + pluginConfig: {}, + manager, + signal: controller.signal, + maxSessionRestarts: 1, + spawnChild: () => child, + }); + + setTimeout(() => controller.abort(), 10); + + await expect(runPromise).resolves.toBe(130); + expect(child.kill).toHaveBeenCalled(); + expect(child.killSignals.length).toBeGreaterThan(0); + }, + ); + + it("cleans up the parent abort listener for each linked abort controller", () => { + const controller = new AbortController(); + const addSpy = vi.spyOn(controller.signal, "addEventListener"); + const removeSpy = vi.spyOn(controller.signal, "removeEventListener"); + + try { + const first = supervisorTestApi?.createLinkedAbortController( + controller.signal, + ); + first?.cleanup(); + const second = supervisorTestApi?.createLinkedAbortController( + controller.signal, + ); + second?.cleanup(); + + expect(addSpy).toHaveBeenCalledTimes(2); + expect(removeSpy).toHaveBeenCalledTimes(2); + } finally { + addSpy.mockRestore(); + removeSpy.mockRestore(); + } + }); + + it.each([ + [ + "throws", + () => { + throw new Error("boom"); + }, + /Failed to resolve supervisor storage path via runtime\.getStoragePath\(\): boom/, + ], + [ + "returns empty whitespace", + () => " ", + /Failed to resolve supervisor storage path via runtime\.getStoragePath\(\): received an empty path/, + ], + ])( + "does not fall back to the default Codex home lock path when runtime.getStoragePath %s", + async (_label, getStoragePath, expectedError) => { + const codexHome = createTempDir(); + process.env.CODEX_HOME = codexHome; + const loadFromDisk = vi.fn(async () => new FakeManager()); + const runtime = { + AccountManager: { loadFromDisk }, + getStoragePath, + }; + const defaultLockPath = join( + codexHome, + "multi-auth", + "openai-codex-accounts.json.supervisor.lock", + ); + + await expect( + supervisorTestApi?.withLockedManager(runtime, async () => "ok", undefined), + ).rejects.toThrow(expectedError); + expect(loadFromDisk).not.toHaveBeenCalled(); + await expect(fs.access(defaultLockPath)).rejects.toMatchObject({ + code: "ENOENT", + }); + }, + ); + + it("renews the supervisor storage lock while a critical section is still running", async () => { + process.env.CODEX_AUTH_CLI_SESSION_LOCK_TTL_MS = "40"; + process.env.CODEX_AUTH_CLI_SESSION_LOCK_POLL_MS = "5"; + process.env.CODEX_AUTH_CLI_SESSION_LOCK_WAIT_MS = "500"; + + const manager = new FakeManager(); + const runtime = createFakeRuntime(manager); + const firstEntered = createDeferred(); + const releaseFirst = createDeferred(); + let firstReleased = false; + let secondEntered = false; + + const first = supervisorTestApi?.withLockedManager( + runtime, + async () => { + firstEntered.resolve(); + await releaseFirst.promise; + firstReleased = true; + return "first"; + }, + undefined, + ); + await firstEntered.promise; + + const second = supervisorTestApi?.withLockedManager( + runtime, + async () => { + secondEntered = true; + expect(firstReleased).toBe(true); + return "second"; + }, + undefined, + ); + + await new Promise((resolve) => setTimeout(resolve, 140)); + expect(secondEntered).toBe(false); + + releaseFirst.resolve(); + await expect(Promise.all([first, second])).resolves.toEqual([ + "first", + "second", + ]); + }); + + it( + "fails fast when the supervisor lock heartbeat loses the lease mid-section", + { timeout: 10_000 }, + async () => { + process.env.CODEX_AUTH_CLI_SESSION_LOCK_TTL_MS = "30"; + process.env.CODEX_AUTH_CLI_SESSION_LOCK_WAIT_MS = "200"; + + const manager = new FakeManager(); + const runtime = createFakeRuntime(manager); + const entered = createDeferred(); + const observedAbort = createDeferred(); + let firstExited = false; + let secondEntered = false; + const criticalSection = supervisorTestApi?.withLockedManager( + runtime, + async (_freshManager, lockSignal) => { + entered.resolve(); + await new Promise((resolve) => { + lockSignal?.addEventListener( + "abort", + () => { + observedAbort.resolve(); + resolve(); + }, + { once: true }, + ); + }); + firstExited = true; + return "held"; + }, + undefined, + ); + await entered.promise; + + const lockPath = supervisorTestApi?.getSupervisorStorageLockPath(runtime); + expect(lockPath).toBeTruthy(); + if (!lockPath) { + return; + } + await fs.unlink(lockPath); + await observedAbort.promise; + + const secondSection = supervisorTestApi?.withLockedManager( + runtime, + async () => { + secondEntered = true; + expect(firstExited).toBe(true); + return "second"; + }, + undefined, + ); + + await expect(criticalSection).rejects.toThrow( + `Supervisor lock heartbeat lost lease at ${lockPath} for owner`, + ); + await expect(secondSection).resolves.toBe("second"); + expect(secondEntered).toBe(true); + }, + ); + + it( + "returns a failure exit code when the monitor loop fails after startup", + { timeout: 10_000 }, + async () => { + class FakeChild extends EventEmitter { + exitCode: number | null = null; + + constructor(exitCode: number) { + super(); + setTimeout(() => { + this.exitCode = exitCode; + this.emit("exit", exitCode, null); + }, 25); + } + + kill(_signal: string) { + return true; + } + } + + const stderrSpy = vi + .spyOn(process.stderr, "write") + .mockImplementation(() => true); + const manager = new FakeManager(); + const runtime = createFakeRuntime(manager); + + try { + const result = await supervisorTestApi?.runInteractiveSupervision({ + codexBin: "dist/bin/codex.js", + initialArgs: ["resume", "monitor-failure-session"], + buildForwardArgs: (rawArgs: string[]) => [...rawArgs], + runtime, + pluginConfig: {}, + manager, + signal: undefined, + maxSessionRestarts: 1, + spawnChild: () => new FakeChild(0), + findBinding: async ({ sessionId }: { sessionId?: string }) => ({ + sessionId: sessionId ?? "monitor-failure-session", + rolloutPath: null, + lastActivityAtMs: Date.now(), + }), + loadCurrentState: async () => { + throw new Error("Timed out waiting for supervisor storage lock"); + }, + }); + + expect(result).toBe(1); + expect(stderrSpy).toHaveBeenCalledWith( + expect.stringContaining("monitor loop failed: Timed out waiting for supervisor storage lock"), + ); + } finally { + stderrSpy.mockRestore(); + } + }, + ); + + it("does not cool down accounts when the quota probe is unavailable", async () => { + const manager = new FakeManager(); + const runtime = createFakeRuntime(manager, { + onFetch(accountId) { + if (accountId === "near-limit") { + const error = new Error("quota probe unavailable"); + error.name = "QuotaProbeUnavailableError"; + throw error; + } + }, + }); + + const result = await supervisorTestApi?.ensureLaunchableAccount( + runtime, + {}, + undefined, + { probeTimeoutMs: 250 }, + ); + + expect(result).toMatchObject({ + ok: true, + account: { accountId: "healthy" }, + }); + expect(manager.getAccountByIndex(0)?.coolingDownUntil).toBe(0); + expect(manager.activeIndex).toBe(1); + }); + + it( + "paces repeated quota probe outages instead of hot-looping the monitor", + { timeout: 10_000 }, + async () => { + process.env.CODEX_AUTH_CLI_SESSION_SUPERVISOR_POLL_MS = "30"; + + class HangingChild extends EventEmitter { + exitCode: number | null = null; + kill = vi.fn((signal: string) => { + setTimeout(() => { + this.exitCode = 130; + this.emit("exit", 130, signal); + }, 0); + return true; + }); + } + + const manager = new FakeManager(); + const controller = new AbortController(); + let fetchAttempts = 0; + const runtime = createFakeRuntime(manager, { + onFetch(accountId) { + if (accountId === "near-limit") { + fetchAttempts += 1; + if (fetchAttempts === 2) { + controller.abort(); + } + throw new Error("quota endpoint unavailable"); + } + }, + }); + const child = new HangingChild(); + + const runPromise = supervisorTestApi?.runInteractiveSupervision({ + codexBin: "dist/bin/codex.js", + initialArgs: ["chat"], + buildForwardArgs: (rawArgs: string[]) => [...rawArgs], + runtime, + pluginConfig: {}, + manager, + signal: controller.signal, + maxSessionRestarts: 1, + spawnChild: () => child, + loadCurrentState: async () => ({ + manager, + currentAccount: manager.getCurrentAccountForFamily(), + }), + waitForBinding: async () => ({ + sessionId: "probe-unavailable-session", + rolloutPath: null, + lastActivityAtMs: Date.now(), + }), + refreshBinding: async (binding: { + sessionId: string; + rolloutPath: string | null; + lastActivityAtMs: number; + }) => binding, + }); + + for (let attempt = 0; attempt < 20 && fetchAttempts === 0; attempt += 1) { + await new Promise((resolve) => setTimeout(resolve, 5)); + } + expect(fetchAttempts).toBe(1); + await new Promise((resolve) => setTimeout(resolve, 20)); + expect(fetchAttempts).toBe(1); + await expect(runPromise).rejects.toMatchObject({ + name: "AbortError", + message: "Supervisor storage lock wait aborted", + }); + expect(fetchAttempts).toBe(2); + expect(child.kill).toHaveBeenCalled(); + }, + ); + + it("degrades a failed candidate probe and continues to the next healthy account", async () => { + const manager = new FakeManager([ + { accountId: "broken", access: "token-broken" }, + { accountId: "healthy", access: "token-healthy" }, + ]); + const runtime = createFakeRuntime(manager, { + onFetch(accountId) { + if (accountId === "broken") { + throw new Error("network fault"); + } + }, + }); + + const result = await supervisorTestApi?.ensureLaunchableAccount( + runtime, + {}, + undefined, + { probeTimeoutMs: 250 }, + ); + + expect(result).toMatchObject({ + ok: true, + account: { accountId: "healthy" }, + }); + expect(manager.activeIndex).toBe(1); + }); +}); diff --git a/test/plugin-config.test.ts b/test/plugin-config.test.ts index 9caebf96..588d259b 100644 --- a/test/plugin-config.test.ts +++ b/test/plugin-config.test.ts @@ -21,6 +21,7 @@ import { getPreemptiveQuotaRemainingPercent5h, getPreemptiveQuotaRemainingPercent7d, getPreemptiveQuotaMaxDeferralMs, + getCodexCliSessionSupervisor, } from '../lib/config.js'; import type { PluginConfig } from '../lib/types.js'; import * as fs from 'node:fs'; @@ -67,6 +68,7 @@ describe('Plugin Configuration', () => { 'CODEX_AUTH_PREEMPTIVE_QUOTA_5H_REMAINING_PCT', 'CODEX_AUTH_PREEMPTIVE_QUOTA_7D_REMAINING_PCT', 'CODEX_AUTH_PREEMPTIVE_QUOTA_MAX_DEFERRAL_MS', + 'CODEX_AUTH_CLI_SESSION_SUPERVISOR', ] as const; const originalEnv: Partial> = {}; @@ -139,6 +141,7 @@ describe('Plugin Configuration', () => { preemptiveQuotaRemainingPercent5h: 5, preemptiveQuotaRemainingPercent7d: 5, preemptiveQuotaMaxDeferralMs: 2 * 60 * 60_000, + codexCliSessionSupervisor: false, }); // existsSync is called with multiple candidate config paths (primary + legacy fallbacks) expect(mockExistsSync).toHaveBeenCalled(); @@ -197,6 +200,7 @@ describe('Plugin Configuration', () => { preemptiveQuotaRemainingPercent5h: 5, preemptiveQuotaRemainingPercent7d: 5, preemptiveQuotaMaxDeferralMs: 2 * 60 * 60_000, + codexCliSessionSupervisor: false, }); }); @@ -452,6 +456,7 @@ describe('Plugin Configuration', () => { preemptiveQuotaRemainingPercent5h: 5, preemptiveQuotaRemainingPercent7d: 5, preemptiveQuotaMaxDeferralMs: 2 * 60 * 60_000, + codexCliSessionSupervisor: false, }); }); @@ -516,6 +521,7 @@ describe('Plugin Configuration', () => { preemptiveQuotaRemainingPercent5h: 5, preemptiveQuotaRemainingPercent7d: 5, preemptiveQuotaMaxDeferralMs: 2 * 60 * 60_000, + codexCliSessionSupervisor: false, }); expect(mockLogWarn).toHaveBeenCalled(); }); @@ -574,6 +580,7 @@ describe('Plugin Configuration', () => { preemptiveQuotaRemainingPercent5h: 5, preemptiveQuotaRemainingPercent7d: 5, preemptiveQuotaMaxDeferralMs: 2 * 60 * 60_000, + codexCliSessionSupervisor: false, }); expect(mockLogWarn).toHaveBeenCalled(); }); @@ -980,5 +987,24 @@ describe('Plugin Configuration', () => { expect(getPreemptiveQuotaMaxDeferralMs({ preemptiveQuotaMaxDeferralMs: 2_000 })).toBe(123000); }); }); + + describe('CLI session supervisor setting', () => { + it('should default the supervisor wrapper to disabled', () => { + expect(getCodexCliSessionSupervisor({})).toBe(false); + }); + + it('should honor the config value when the env override is unset', () => { + delete process.env.CODEX_AUTH_CLI_SESSION_SUPERVISOR; + expect( + getCodexCliSessionSupervisor({ codexCliSessionSupervisor: true }), + ).toBe(true); + }); + + it('should prioritize environment override for the supervisor wrapper', () => { + process.env.CODEX_AUTH_CLI_SESSION_SUPERVISOR = '1'; + expect(getCodexCliSessionSupervisor({ codexCliSessionSupervisor: false })).toBe(true); + delete process.env.CODEX_AUTH_CLI_SESSION_SUPERVISOR; + }); + }); }); diff --git a/test/quota-probe.test.ts b/test/quota-probe.test.ts index 96e9605f..5db377ff 100644 --- a/test/quota-probe.test.ts +++ b/test/quota-probe.test.ts @@ -170,6 +170,63 @@ describe("quota-probe", () => { await assertion; expect(fetchMock).toHaveBeenCalledTimes(1); }); + + it("aborts immediately when the caller abort signal fires", async () => { + const controller = new AbortController(); + let markFetchStarted!: () => void; + const fetchStarted = new Promise((resolve) => { + markFetchStarted = resolve; + }); + const fetchMock = vi.fn((_url: string, init?: RequestInit) => { + return new Promise((_resolve, reject) => { + init?.signal?.addEventListener( + "abort", + () => { + const error = new Error("aborted"); + (error as Error & { name?: string }).name = "AbortError"; + reject(error); + }, + { once: true }, + ); + markFetchStarted(); + }); + }); + vi.stubGlobal("fetch", fetchMock); + + const pending = fetchCodexQuotaSnapshot({ + accountId: "acc-abort", + accessToken: "token-abort", + model: "gpt-5-codex", + fallbackModels: [], + timeoutMs: 30_000, + signal: controller.signal, + }); + + await fetchStarted; + controller.abort(); + + await expect(pending).rejects.toThrow(/abort/i); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it("rejects immediately when the caller signal is already aborted", async () => { + const controller = new AbortController(); + controller.abort(); + const fetchMock = vi.fn(); + vi.stubGlobal("fetch", fetchMock); + + await expect( + fetchCodexQuotaSnapshot({ + accountId: "acc-pre-aborted", + accessToken: "token-pre-aborted", + model: "gpt-5-codex", + fallbackModels: [], + timeoutMs: 30_000, + signal: controller.signal, + }), + ).rejects.toThrow(/abort/i); + expect(fetchMock).not.toHaveBeenCalled(); + }); it("parses reset-at values expressed as epoch seconds and epoch milliseconds", async () => { const nowSec = Math.floor(Date.now() / 1000); const primarySeconds = nowSec + 120; diff --git a/test/settings-hub-utils.test.ts b/test/settings-hub-utils.test.ts index 6e0cc6de..2bdb82b9 100644 --- a/test/settings-hub-utils.test.ts +++ b/test/settings-hub-utils.test.ts @@ -1,4 +1,4 @@ -import { mkdtempSync, rmSync } from "node:fs"; +import { mkdtempSync, promises as fs } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; @@ -160,6 +160,31 @@ const originalCodeHome = process.env.CODEX_HOME; const originalCodeMultiAuthDir = process.env.CODEX_MULTI_AUTH_DIR; const originalConfigPath = process.env.CODEX_MULTI_AUTH_CONFIG_PATH; +async function removeDirectoryWithRetry(dir: string): Promise { + const retryableCodes = new Set([ + "ENOTEMPTY", + "EPERM", + "EBUSY", + "EACCES", + "EAGAIN", + ]); + for (let attempt = 1; attempt <= 6; attempt += 1) { + try { + await fs.rm(dir, { recursive: true, force: true }); + return; + } catch (error) { + const code = + error && typeof error === "object" && "code" in error + ? `${error.code ?? ""}` + : ""; + if (!retryableCodes.has(code) || attempt === 6) { + throw error; + } + await new Promise((resolve) => setTimeout(resolve, attempt * 50)); + } + } +} + async function loadSettingsHubTestApi(): Promise { const module = await import("../lib/codex-manager/settings-hub.js"); return module.__testOnly as SettingsHubTestApi; @@ -193,9 +218,6 @@ beforeEach(() => { afterEach(() => { vi.restoreAllMocks(); vi.resetModules(); - if (tempRoot.length > 0) { - rmSync(tempRoot, { recursive: true, force: true }); - } if (originalCodeHome === undefined) { delete process.env.CODEX_HOME; } else { @@ -215,6 +237,12 @@ afterEach(() => { restoreStreamIsTTY(process.stdout, originalStdoutDescriptor); }); +afterEach(async () => { + if (tempRoot.length > 0) { + await removeDirectoryWithRetry(tempRoot); + } +}); + describe("settings-hub utility coverage", () => { it("clamps backend numeric settings by option bounds", async () => { const api = await loadSettingsHubTestApi(); @@ -717,10 +745,22 @@ describe("settings-hub utility coverage", () => { expect(selected?.proactiveRefreshIntervalMs).toBe(60_000); }); + it("toggles the CLI session supervisor in experimental settings", async () => { + const api = await loadSettingsHubTestApi(); + queueSelectResults( + { type: "toggle-session-supervisor" }, + { type: "save" }, + ); + const selected = await api.promptExperimentalSettings({ + codexCliSessionSupervisor: false, + }); + expect(selected?.codexCliSessionSupervisor).toBe(true); + }); + it("supports experimental submenu hotkeys for guardian toggle and interval increase", async () => { const api = await loadSettingsHubTestApi(); queueSelectResults( - triggerSettingsHubHotkey("3"), + triggerSettingsHubHotkey("4"), triggerSettingsHubHotkey("]"), triggerSettingsHubHotkey("s"), ); @@ -748,8 +788,14 @@ describe("settings-hub utility coverage", () => { it("maps experimental menu and status hotkeys including numeric and uppercase variants", async () => { const api = await loadSettingsHubTestApi(); - expect(api.mapExperimentalMenuHotkey("1")).toEqual({ type: "sync" }); - expect(api.mapExperimentalMenuHotkey("2")).toEqual({ type: "backup" }); + expect(api.mapExperimentalMenuHotkey("1")).toEqual({ + type: "toggle-session-supervisor", + }); + expect(api.mapExperimentalMenuHotkey("2")).toEqual({ type: "sync" }); + expect(api.mapExperimentalMenuHotkey("3")).toEqual({ type: "backup" }); + expect(api.mapExperimentalMenuHotkey("4")).toEqual({ + type: "toggle-refresh-guardian", + }); expect(api.mapExperimentalMenuHotkey("[")).toEqual({ type: "decrease-refresh-interval", });