Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 236 additions & 21 deletions src/core/UsageStatsService.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class UsageStatsService {
this.statsFilePath = path.join(this.dataDir, "usage-stats.jsonl");
this.enabled = enabled !== false;
this.appendPromise = Promise.resolve();
this.isImportingStats = false;

if (this.enabled) {
// Ensure data directory exists
Expand Down Expand Up @@ -131,6 +132,12 @@ class UsageStatsService {
if (!tracker) return null;

this.activeRequests.delete(requestId);
if (this.isImportingStats) {
if (this.logger) {
this.logger.info(`[UsageStats] Dropped request ${requestId} because stats import is in progress.`);
}
return null;
}

const finishedAtMs = Date.now();
const lastAttempt = tracker.attempts[tracker.attempts.length - 1] || null;
Expand Down Expand Up @@ -240,28 +247,45 @@ class UsageStatsService {
};
}

_loadFromFile() {
if (!this.enabled) return;
try {
if (!fs.existsSync(this.statsFilePath)) return;
/**
* Public import entry point. Drops newly finished stats during import, waits
* for already queued appends, then rewrites the stats file from a stable baseline.
*/
importJsonl(content) {
if (!this.enabled) {
throw new Error("Usage stats are disabled");
}
if (this.isImportingStats) {
throw new Error("Usage stats import is already in progress");
}
if (typeof content !== "string") {
throw new Error("Invalid JSONL content");
}

const lines = fs.readFileSync(this.statsFilePath, "utf-8").split("\n").filter(Boolean);
this.records = [];
this.sequence = 0;
this.isImportingStats = true;

for (const line of lines) {
try {
const record = this._normalizeLoadedRecord(JSON.parse(line));
this.records.push(record);
if (record.sequence > this.sequence) {
this.sequence = record.sequence;
}
} catch {
// Skip malformed lines
}
}
const importPromise = this.appendPromise
.catch(() => {})
.then(() => this._importJsonlContent(content))
.finally(() => {
this.isImportingStats = false;
});

this.appendPromise = importPromise.catch(() => {});
return importPromise;
}

/**
* Load persisted JSONL records during startup and rebuild derived in-memory
* aggregates from the records that can be parsed.
*/
_loadFromFile() {
try {
const { records } = this._readRecordsFromFile();
if (records.length === 0 && !fs.existsSync(this.statsFilePath)) return;

// Recalculate aggregates from all loaded records
this._replaceRecords(records);
this._recalculateFromRecords();

if (this.logger) {
Expand All @@ -287,9 +311,191 @@ class UsageStatsService {
});
}

_recalculateFromRecords() {
this.startedAtMs = Date.now();
this.startedAt = new Date(this.startedAtMs).toISOString();
/**
* Import JSONL content, deduplicate by requestId, merge with current records,
* rewrite the file in finishedAt order, and rebuild memory state.
*/
async _importJsonlContent(content) {
if (!fs.existsSync(this.dataDir)) {
fs.mkdirSync(this.dataDir, { recursive: true });
}

const { records: existingRecords } = await this._readRecordsFromFileAsync();
const uniqueExistingRecords = [];
const seenRequestIds = new Set();
let duplicateCount = 0;
let missingRequestIdCount = 0;

for (const record of existingRecords) {
const requestId = this._normalizeRequestId(record.requestId);
if (!requestId) {
missingRequestIdCount += 1;
continue;
}

record.requestId = requestId;
if (seenRequestIds.has(requestId)) {
duplicateCount += 1;
continue;
}

seenRequestIds.add(requestId);
uniqueExistingRecords.push(record);
}

const importedRecords = [];
let invalidLineCount = 0;
const lines = content.split(/\r?\n/);

for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;

let record;
try {
record = this._normalizeLoadedRecord(JSON.parse(trimmed));
} catch {
invalidLineCount += 1;
continue;
}

const requestId = this._normalizeRequestId(record.requestId);
if (!requestId) {
missingRequestIdCount += 1;
continue;
}
record.requestId = requestId;

if (seenRequestIds.has(requestId)) {
duplicateCount += 1;
continue;
}

seenRequestIds.add(requestId);
importedRecords.push(record);
}

const mergedRecords = this._normalizeImportedRecords(uniqueExistingRecords.concat(importedRecords));
const fileContent = mergedRecords.map(record => JSON.stringify(record)).join("\n");
await fs.promises.writeFile(this.statsFilePath, fileContent ? `${fileContent}\n` : "", "utf-8");
Comment thread
bbbugg marked this conversation as resolved.
this._replaceRecords(mergedRecords);
this._recalculateFromRecords({ resetStartedAt: false });
Comment thread
bbbugg marked this conversation as resolved.

if (this.logger) {
this.logger.info(
`[UsageStats] Imported ${importedRecords.length} records from JSONL. ` +
`Skipped ${duplicateCount} duplicates, ${invalidLineCount} invalid lines, ` +
`${missingRequestIdCount} without requestId.`
);
}

return {
duplicateCount,
importedCount: importedRecords.length,
invalidLineCount,
missingRequestIdCount,
totalRecords: mergedRecords.length,
};
}

/**
* Read valid records from the persisted usage-stats JSONL file. Malformed
* lines are ignored so one bad line does not prevent loading the rest.
*/
async _readRecordsFromFileAsync() {
try {
const content = await fs.promises.readFile(this.statsFilePath, "utf-8");
return { records: this._parseRecordsContent(content) };
} catch (error) {
if (error?.code === "ENOENT") {
return { records: [] };
}

throw error;
}
}

_readRecordsFromFile() {
try {
const content = fs.readFileSync(this.statsFilePath, "utf-8");
return { records: this._parseRecordsContent(content) };
} catch (error) {
if (error?.code === "ENOENT") {
return { records: [] };
}

throw error;
}
}

_parseRecordsContent(content) {
const records = [];
const lines = content.split(/\r?\n/).filter(Boolean);
for (const line of lines) {
try {
Comment thread
bbbugg marked this conversation as resolved.
records.push(this._normalizeLoadedRecord(JSON.parse(line)));
} catch {
// Skip malformed lines
}
}

return records;
}

/**
* Normalize imported data by finished time and assign continuous sequence
* numbers from 1..N after records from multiple files have been merged.
*/
_normalizeImportedRecords(records) {
return records
.map((record, originalIndex) => ({ originalIndex, record }))
.sort((a, b) => {
const aTime = this._getRecordSortTime(a.record);
const bTime = this._getRecordSortTime(b.record);
if (aTime !== bTime) return aTime - bTime;
return a.originalIndex - b.originalIndex;
})
.map((item, index) => ({
...item.record,
sequence: index + 1,
}));
}

/**
* Return the timestamp used for import ordering: finishedAt first, then
* startedAt, then the previous sequence as a final fallback.
*/
_getRecordSortTime(record) {
const finishedAtMs = Date.parse(record?.finishedAt);
if (Number.isFinite(finishedAtMs)) return finishedAtMs;

const startedAtMs = Date.parse(record?.startedAt);
if (Number.isFinite(startedAtMs)) return startedAtMs;

const sequence = Number(record?.sequence);
return Number.isFinite(sequence) ? sequence : Number.MAX_SAFE_INTEGER;
}

/**
* Replace the in-memory record list and reset sequence to the highest record
* sequence so new records continue after the current data set.
*/
_replaceRecords(records) {
this.records = records;
this.sequence = 0;
for (const record of records) {
if (record.sequence > this.sequence) {
this.sequence = record.sequence;
}
}
}

_recalculateFromRecords(options = {}) {
const { resetStartedAt = true } = options;
if (resetStartedAt) {
this.startedAtMs = Date.now();
this.startedAt = new Date(this.startedAtMs).toISOString();
}
this.summary = {
abortedCount: 0,
errorCount: 0,
Expand Down Expand Up @@ -400,6 +606,15 @@ class UsageStatsService {
};
}

/**
* Normalize requestId for import deduplication. Non-string IDs are treated
* as missing so they cannot be used as merge keys.
*/
_normalizeRequestId(value) {
if (typeof value !== "string") return "";
return value.trim();
}

_normalizeAuthIndex(value) {
return Number.isInteger(value) && value >= 0 ? value : null;
}
Expand Down
65 changes: 65 additions & 0 deletions src/routes/StatusRoutes.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,71 @@ class StatusRoutes {
res.json(snapshot || UsageStatsService.createEmptySnapshot());
});

app.get("/api/usage-stats/download", isAuthenticated, async (req, res) => {
try {
const usageStatsService = this.serverSystem.usageStatsService;
if (!usageStatsService?.enabled) {
return res.status(403).json({ message: "usageStatsDisabled" });
}
if (usageStatsService.isImportingStats) {
return res.status(409).json({ message: "usageStatsImportInProgress" });
}
const statsFilePath =
usageStatsService?.statsFilePath || path.join(process.cwd(), "data", "usage-stats.jsonl");

if (usageStatsService?.appendPromise) {
await usageStatsService.appendPromise.catch(() => {});
}

if (!fs.existsSync(statsFilePath)) {
return res.status(404).json({ message: "usageStatsDownloadNoData" });
}

if (req.query.check === "1") {
return res.json({ ok: true });
}

res.setHeader("Content-Type", "application/x-ndjson; charset=utf-8");
Comment thread
bbbugg marked this conversation as resolved.
res.sendFile(statsFilePath);
Comment thread
bbbugg marked this conversation as resolved.
} catch (error) {
this.logger.error(`[WebUI] Failed to download usage stats: ${error.message}`);
res.status(500).json({ error: error.message, message: "usageStatsDownloadFailed" });
}
});

app.post("/api/usage-stats/import", isAuthenticated, async (req, res) => {
try {
const usageStatsService = this.serverSystem.usageStatsService;
if (!usageStatsService?.enabled) {
return res.status(403).json({ message: "usageStatsDisabled" });
}
if (usageStatsService.isImportingStats) {
return res.status(409).json({ message: "usageStatsImportInProgress" });
}

const { content, filename } = req.body || {};
if (typeof filename !== "string" || !filename.toLowerCase().endsWith(".jsonl")) {
return res.status(400).json({ message: "usageStatsImportJsonlOnly" });
}
if (typeof content !== "string") {
return res.status(400).json({ message: "usageStatsImportInvalidFile" });
}
Comment thread
bbbugg marked this conversation as resolved.

const result = await usageStatsService.importJsonl(content);
res.json({
Comment thread
bbbugg marked this conversation as resolved.
duplicateCount: result.duplicateCount,
importedCount: result.importedCount,
invalidLineCount: result.invalidLineCount,
message: "usageStatsImportSuccess",
missingRequestIdCount: result.missingRequestIdCount,
totalRecords: result.totalRecords,
});
} catch (error) {
this.logger.error(`[WebUI] Failed to import usage stats: ${error.message}`);
res.status(500).json({ error: error.message, message: "usageStatsImportFailed" });
}
});

app.put("/api/accounts/current", isAuthenticated, async (req, res) => {
try {
if (this._rejectIfSystemBusy(res)) return;
Expand Down
Loading
Loading