diff --git a/src/core/UsageStatsService.js b/src/core/UsageStatsService.js index d9cf3a1..27d5094 100644 --- a/src/core/UsageStatsService.js +++ b/src/core/UsageStatsService.js @@ -16,6 +16,7 @@ class UsageStatsService { this.statsFilePath = path.join(this.dataDir, "usage-stats.jsonl"); this.enabled = enabled !== false; this.appendPromise = Promise.resolve(); + this.isImportingStats = false; if (this.enabled) { // Ensure data directory exists @@ -131,6 +132,12 @@ class UsageStatsService { if (!tracker) return null; this.activeRequests.delete(requestId); + if (this.isImportingStats) { + if (this.logger) { + this.logger.info(`[UsageStats] Dropped request ${requestId} because stats import is in progress.`); + } + return null; + } const finishedAtMs = Date.now(); const lastAttempt = tracker.attempts[tracker.attempts.length - 1] || null; @@ -240,28 +247,45 @@ class UsageStatsService { }; } - _loadFromFile() { - if (!this.enabled) return; - try { - if (!fs.existsSync(this.statsFilePath)) return; + /** + * Public import entry point. Drops newly finished stats during import, waits + * for already queued appends, then rewrites the stats file from a stable baseline. + */ + importJsonl(content) { + if (!this.enabled) { + throw new Error("Usage stats are disabled"); + } + if (this.isImportingStats) { + throw new Error("Usage stats import is already in progress"); + } + if (typeof content !== "string") { + throw new Error("Invalid JSONL content"); + } - const lines = fs.readFileSync(this.statsFilePath, "utf-8").split("\n").filter(Boolean); - this.records = []; - this.sequence = 0; + this.isImportingStats = true; - for (const line of lines) { - try { - const record = this._normalizeLoadedRecord(JSON.parse(line)); - this.records.push(record); - if (record.sequence > this.sequence) { - this.sequence = record.sequence; - } - } catch { - // Skip malformed lines - } - } + const importPromise = this.appendPromise + .catch(() => {}) + .then(() => this._importJsonlContent(content)) + .finally(() => { + this.isImportingStats = false; + }); + + this.appendPromise = importPromise.catch(() => {}); + return importPromise; + } + + /** + * Load persisted JSONL records during startup and rebuild derived in-memory + * aggregates from the records that can be parsed. + */ + _loadFromFile() { + try { + const { records } = this._readRecordsFromFile(); + if (records.length === 0 && !fs.existsSync(this.statsFilePath)) return; // Recalculate aggregates from all loaded records + this._replaceRecords(records); this._recalculateFromRecords(); if (this.logger) { @@ -287,9 +311,191 @@ class UsageStatsService { }); } - _recalculateFromRecords() { - this.startedAtMs = Date.now(); - this.startedAt = new Date(this.startedAtMs).toISOString(); + /** + * Import JSONL content, deduplicate by requestId, merge with current records, + * rewrite the file in finishedAt order, and rebuild memory state. + */ + async _importJsonlContent(content) { + if (!fs.existsSync(this.dataDir)) { + fs.mkdirSync(this.dataDir, { recursive: true }); + } + + const { records: existingRecords } = await this._readRecordsFromFileAsync(); + const uniqueExistingRecords = []; + const seenRequestIds = new Set(); + let duplicateCount = 0; + let missingRequestIdCount = 0; + + for (const record of existingRecords) { + const requestId = this._normalizeRequestId(record.requestId); + if (!requestId) { + missingRequestIdCount += 1; + continue; + } + + record.requestId = requestId; + if (seenRequestIds.has(requestId)) { + duplicateCount += 1; + continue; + } + + seenRequestIds.add(requestId); + uniqueExistingRecords.push(record); + } + + const importedRecords = []; + let invalidLineCount = 0; + const lines = content.split(/\r?\n/); + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + + let record; + try { + record = this._normalizeLoadedRecord(JSON.parse(trimmed)); + } catch { + invalidLineCount += 1; + continue; + } + + const requestId = this._normalizeRequestId(record.requestId); + if (!requestId) { + missingRequestIdCount += 1; + continue; + } + record.requestId = requestId; + + if (seenRequestIds.has(requestId)) { + duplicateCount += 1; + continue; + } + + seenRequestIds.add(requestId); + importedRecords.push(record); + } + + const mergedRecords = this._normalizeImportedRecords(uniqueExistingRecords.concat(importedRecords)); + const fileContent = mergedRecords.map(record => JSON.stringify(record)).join("\n"); + await fs.promises.writeFile(this.statsFilePath, fileContent ? `${fileContent}\n` : "", "utf-8"); + this._replaceRecords(mergedRecords); + this._recalculateFromRecords({ resetStartedAt: false }); + + if (this.logger) { + this.logger.info( + `[UsageStats] Imported ${importedRecords.length} records from JSONL. ` + + `Skipped ${duplicateCount} duplicates, ${invalidLineCount} invalid lines, ` + + `${missingRequestIdCount} without requestId.` + ); + } + + return { + duplicateCount, + importedCount: importedRecords.length, + invalidLineCount, + missingRequestIdCount, + totalRecords: mergedRecords.length, + }; + } + + /** + * Read valid records from the persisted usage-stats JSONL file. Malformed + * lines are ignored so one bad line does not prevent loading the rest. + */ + async _readRecordsFromFileAsync() { + try { + const content = await fs.promises.readFile(this.statsFilePath, "utf-8"); + return { records: this._parseRecordsContent(content) }; + } catch (error) { + if (error?.code === "ENOENT") { + return { records: [] }; + } + + throw error; + } + } + + _readRecordsFromFile() { + try { + const content = fs.readFileSync(this.statsFilePath, "utf-8"); + return { records: this._parseRecordsContent(content) }; + } catch (error) { + if (error?.code === "ENOENT") { + return { records: [] }; + } + + throw error; + } + } + + _parseRecordsContent(content) { + const records = []; + const lines = content.split(/\r?\n/).filter(Boolean); + for (const line of lines) { + try { + records.push(this._normalizeLoadedRecord(JSON.parse(line))); + } catch { + // Skip malformed lines + } + } + + return records; + } + + /** + * Normalize imported data by finished time and assign continuous sequence + * numbers from 1..N after records from multiple files have been merged. + */ + _normalizeImportedRecords(records) { + return records + .map((record, originalIndex) => ({ originalIndex, record })) + .sort((a, b) => { + const aTime = this._getRecordSortTime(a.record); + const bTime = this._getRecordSortTime(b.record); + if (aTime !== bTime) return aTime - bTime; + return a.originalIndex - b.originalIndex; + }) + .map((item, index) => ({ + ...item.record, + sequence: index + 1, + })); + } + + /** + * Return the timestamp used for import ordering: finishedAt first, then + * startedAt, then the previous sequence as a final fallback. + */ + _getRecordSortTime(record) { + const finishedAtMs = Date.parse(record?.finishedAt); + if (Number.isFinite(finishedAtMs)) return finishedAtMs; + + const startedAtMs = Date.parse(record?.startedAt); + if (Number.isFinite(startedAtMs)) return startedAtMs; + + const sequence = Number(record?.sequence); + return Number.isFinite(sequence) ? sequence : Number.MAX_SAFE_INTEGER; + } + + /** + * Replace the in-memory record list and reset sequence to the highest record + * sequence so new records continue after the current data set. + */ + _replaceRecords(records) { + this.records = records; + this.sequence = 0; + for (const record of records) { + if (record.sequence > this.sequence) { + this.sequence = record.sequence; + } + } + } + + _recalculateFromRecords(options = {}) { + const { resetStartedAt = true } = options; + if (resetStartedAt) { + this.startedAtMs = Date.now(); + this.startedAt = new Date(this.startedAtMs).toISOString(); + } this.summary = { abortedCount: 0, errorCount: 0, @@ -400,6 +606,15 @@ class UsageStatsService { }; } + /** + * Normalize requestId for import deduplication. Non-string IDs are treated + * as missing so they cannot be used as merge keys. + */ + _normalizeRequestId(value) { + if (typeof value !== "string") return ""; + return value.trim(); + } + _normalizeAuthIndex(value) { return Number.isInteger(value) && value >= 0 ? value : null; } diff --git a/src/routes/StatusRoutes.js b/src/routes/StatusRoutes.js index 8540742..bfc2570 100644 --- a/src/routes/StatusRoutes.js +++ b/src/routes/StatusRoutes.js @@ -182,6 +182,71 @@ class StatusRoutes { res.json(snapshot || UsageStatsService.createEmptySnapshot()); }); + app.get("/api/usage-stats/download", isAuthenticated, async (req, res) => { + try { + const usageStatsService = this.serverSystem.usageStatsService; + if (!usageStatsService?.enabled) { + return res.status(403).json({ message: "usageStatsDisabled" }); + } + if (usageStatsService.isImportingStats) { + return res.status(409).json({ message: "usageStatsImportInProgress" }); + } + const statsFilePath = + usageStatsService?.statsFilePath || path.join(process.cwd(), "data", "usage-stats.jsonl"); + + if (usageStatsService?.appendPromise) { + await usageStatsService.appendPromise.catch(() => {}); + } + + if (!fs.existsSync(statsFilePath)) { + return res.status(404).json({ message: "usageStatsDownloadNoData" }); + } + + if (req.query.check === "1") { + return res.json({ ok: true }); + } + + res.setHeader("Content-Type", "application/x-ndjson; charset=utf-8"); + res.sendFile(statsFilePath); + } catch (error) { + this.logger.error(`[WebUI] Failed to download usage stats: ${error.message}`); + res.status(500).json({ error: error.message, message: "usageStatsDownloadFailed" }); + } + }); + + app.post("/api/usage-stats/import", isAuthenticated, async (req, res) => { + try { + const usageStatsService = this.serverSystem.usageStatsService; + if (!usageStatsService?.enabled) { + return res.status(403).json({ message: "usageStatsDisabled" }); + } + if (usageStatsService.isImportingStats) { + return res.status(409).json({ message: "usageStatsImportInProgress" }); + } + + const { content, filename } = req.body || {}; + if (typeof filename !== "string" || !filename.toLowerCase().endsWith(".jsonl")) { + return res.status(400).json({ message: "usageStatsImportJsonlOnly" }); + } + if (typeof content !== "string") { + return res.status(400).json({ message: "usageStatsImportInvalidFile" }); + } + + const result = await usageStatsService.importJsonl(content); + res.json({ + duplicateCount: result.duplicateCount, + importedCount: result.importedCount, + invalidLineCount: result.invalidLineCount, + message: "usageStatsImportSuccess", + missingRequestIdCount: result.missingRequestIdCount, + totalRecords: result.totalRecords, + }); + } catch (error) { + this.logger.error(`[WebUI] Failed to import usage stats: ${error.message}`); + res.status(500).json({ error: error.message, message: "usageStatsImportFailed" }); + } + }); + app.put("/api/accounts/current", isAuthenticated, async (req, res) => { try { if (this._rejectIfSystemBusy(res)) return; diff --git a/ui/app/pages/StatusPage.vue b/ui/app/pages/StatusPage.vue index 1f96d47..7699af5 100644 --- a/ui/app/pages/StatusPage.vue +++ b/ui/app/pages/StatusPage.vue @@ -1541,6 +1541,61 @@