From bd57f3b461a1267acb46157bf5fc74d5c8702e32 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Fri, 22 May 2026 16:06:49 +0000 Subject: [PATCH] Use Effect idioms in diagnostics and keyed worker Co-authored-by: Julius Marminge --- .../ProcessResourceMonitor.test.ts | 86 +++++++++++++------ .../src/diagnostics/ProcessResourceMonitor.ts | 65 ++++++++------ .../shared/src/KeyedCoalescingWorker.test.ts | 30 +++++-- packages/shared/src/KeyedCoalescingWorker.ts | 46 ++++++---- 4 files changed, 153 insertions(+), 74 deletions(-) diff --git a/apps/server/src/diagnostics/ProcessResourceMonitor.test.ts b/apps/server/src/diagnostics/ProcessResourceMonitor.test.ts index 11d12c012db..78871cf3f4f 100644 --- a/apps/server/src/diagnostics/ProcessResourceMonitor.test.ts +++ b/apps/server/src/diagnostics/ProcessResourceMonitor.test.ts @@ -1,5 +1,6 @@ -import { describe, expect, it } from "@effect/vitest"; +import { assert, describe, it } from "@effect/vitest"; import * as DateTime from "effect/DateTime"; +import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Option from "effect/Option"; @@ -60,10 +61,16 @@ describe("ProcessResourceMonitor", () => { ], }); - expect(samples.map((sample) => sample.pid)).toEqual([100, 101, 102]); - expect(samples.map((sample) => sample.depth)).toEqual([0, 1, 2]); - expect(samples[0]?.isServerRoot).toBe(true); - expect(samples[1]?.isServerRoot).toBe(false); + assert.deepStrictEqual( + samples.map((sample) => sample.pid), + [100, 101, 102], + ); + assert.deepStrictEqual( + samples.map((sample) => sample.depth), + [0, 1, 2], + ); + assert.equal(samples[0]?.isServerRoot, true); + assert.equal(samples[1]?.isServerRoot, false); }), ); @@ -112,18 +119,21 @@ describe("ProcessResourceMonitor", () => { samples, readAt: secondAt, readAtMs: DateTime.toEpochMillis(secondAt), - windowMs: 60_000, - bucketMs: 10_000, - lastError: null, + windowMs: Duration.toMillis(Duration.minutes(1)), + bucketMs: Duration.toMillis(Duration.seconds(10)), + lastError: Option.none(), }); - expect(Option.isNone(result.error)).toBe(true); - expect(result.topProcesses).toHaveLength(1); - expect(result.topProcesses[0]?.avgCpuPercent).toBe(20); - expect(result.topProcesses[0]?.maxCpuPercent).toBe(30); - expect(result.topProcesses[0]?.cpuSecondsApprox).toBe(2); - expect(result.totalCpuSecondsApprox).toBe(2); - expect(result.buckets.some((bucket) => bucket.maxCpuPercent === 30)).toBe(true); + assert.equal(Option.isNone(result.error), true); + assert.equal(result.topProcesses.length, 1); + assert.equal(result.topProcesses[0]?.avgCpuPercent, 20); + assert.equal(result.topProcesses[0]?.maxCpuPercent, 30); + assert.equal(result.topProcesses[0]?.cpuSecondsApprox, 2); + assert.equal(result.totalCpuSecondsApprox, 2); + assert.equal( + result.buckets.some((bucket) => bucket.maxCpuPercent === 30), + true, + ); }), ); @@ -172,15 +182,15 @@ describe("ProcessResourceMonitor", () => { samples, readAt: secondAt, readAtMs: DateTime.toEpochMillis(secondAt), - windowMs: 60_000, - bucketMs: 10_000, - lastError: null, + windowMs: Duration.toMillis(Duration.minutes(1)), + bucketMs: Duration.toMillis(Duration.seconds(10)), + lastError: Option.none(), }); - expect(result.topProcesses).toHaveLength(1); - expect(result.topProcesses[0]?.isServerRoot).toBe(true); - expect(result.topProcesses[0]?.sampleCount).toBe(2); - expect(result.topProcesses[0]?.maxRssBytes).toBe(2_000); + assert.equal(result.topProcesses.length, 1); + assert.equal(result.topProcesses[0]?.isServerRoot, true); + assert.equal(result.topProcesses[0]?.sampleCount, 2); + assert.equal(result.topProcesses[0]?.maxRssBytes, 2_000); }), ); @@ -219,13 +229,35 @@ describe("ProcessResourceMonitor", () => { samples, readAt: sampledAt, readAtMs: DateTime.toEpochMillis(sampledAt), - windowMs: 60_000, - bucketMs: 10_000, - lastError: null, + windowMs: Duration.toMillis(Duration.minutes(1)), + bucketMs: Duration.toMillis(Duration.seconds(10)), + lastError: Option.none(), }); - expect(result.topProcesses).toHaveLength(36); - expect(result.topProcesses.some((process) => process.command === "worker 34")).toBe(true); + assert.equal(result.topProcesses.length, 36); + assert.equal( + result.topProcesses.some((process) => process.command === "worker 34"), + true, + ); + }), + ); + + it.effect("maps the latest sampling error option into the response", () => + Effect.sync(() => { + const readAt = DateTime.makeUnsafe("2026-05-05T10:00:00.000Z"); + const result = aggregateProcessResourceHistory({ + samples: [], + readAt, + readAtMs: DateTime.toEpochMillis(readAt), + windowMs: Duration.toMillis(Duration.minutes(1)), + bucketMs: Duration.toMillis(Duration.seconds(10)), + lastError: Option.some("ps failed"), + }); + + if (Option.isNone(result.error)) { + assert.fail("Expected response error"); + } + assert.deepStrictEqual(result.error.value, { message: "ps failed" }); }), ); }); diff --git a/apps/server/src/diagnostics/ProcessResourceMonitor.ts b/apps/server/src/diagnostics/ProcessResourceMonitor.ts index 2b6dfe8d362..d5528b81af8 100644 --- a/apps/server/src/diagnostics/ProcessResourceMonitor.ts +++ b/apps/server/src/diagnostics/ProcessResourceMonitor.ts @@ -6,10 +6,12 @@ import type { } from "@t3tools/contracts"; import * as Context from "effect/Context"; import * as DateTime from "effect/DateTime"; +import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; import * as Ref from "effect/Ref"; +import * as Schedule from "effect/Schedule"; import { ChildProcessSpawner } from "effect/unstable/process"; import { @@ -19,8 +21,11 @@ import { readProcessRows, } from "./ProcessDiagnostics.ts"; -const SAMPLE_INTERVAL_MS = 5_000; -const RETENTION_MS = 60 * 60_000; +const SAMPLE_INTERVAL = Duration.seconds(5); +const RETENTION = Duration.minutes(60); +const MIN_HISTORY_DURATION = Duration.seconds(1); +const SAMPLE_INTERVAL_MS = Duration.toMillis(SAMPLE_INTERVAL); +const RETENTION_MS = Duration.toMillis(RETENTION); const MAX_RETAINED_SAMPLES = 20_000; export interface ProcessResourceSample { @@ -38,7 +43,7 @@ export interface ProcessResourceSample { interface MonitorState { readonly samples: ReadonlyArray; - readonly lastError: string | null; + readonly lastError: Option.Option; } export interface ProcessResourceMonitorShape { @@ -60,8 +65,11 @@ function sampleKey(row: Pick): string { return `${row.pid}:${row.command}`; } -function findServerRootRow(rows: ReadonlyArray, serverPid: number): ProcessRow | null { - return rows.find((row) => row.pid === serverPid) ?? null; +function findServerRootRow( + rows: ReadonlyArray, + serverPid: number, +): Option.Option { + return Option.fromUndefinedOr(rows.find((row) => row.pid === serverPid)); } export function collectMonitoredSamples(input: { @@ -75,16 +83,16 @@ export function collectMonitoredSamples(input: { const descendants = buildDescendantEntries(rows, input.serverPid); const samples: ProcessResourceSample[] = []; - if (root) { + if (Option.isSome(root)) { samples.push({ sampledAt: input.sampledAt, sampledAtMs: input.sampledAtMs, - processKey: sampleKey(root), - pid: root.pid, - ppid: root.ppid, - command: root.command, - cpuPercent: root.cpuPercent, - rssBytes: root.rssBytes, + processKey: sampleKey(root.value), + pid: root.value.pid, + ppid: root.value.ppid, + command: root.value.command, + cpuPercent: root.value.cpuPercent, + rssBytes: root.value.rssBytes, depth: 0, isServerRoot: true, }); @@ -166,11 +174,12 @@ function summarizeProcesses( function buildBuckets(input: { readonly samples: ReadonlyArray; readonly nowMs: number; - readonly windowMs: number; - readonly bucketMs: number; + readonly window: Duration.Duration; + readonly bucket: Duration.Duration; }): ReadonlyArray { - const bucketMs = Math.max(1_000, input.bucketMs); - const windowStartMs = input.nowMs - input.windowMs; + const bucketMs = Duration.toMillis(Duration.max(MIN_HISTORY_DURATION, input.bucket)); + const windowMs = Duration.toMillis(input.window); + const windowStartMs = input.nowMs - windowMs; const buckets: ServerProcessResourceHistoryBucket[] = []; for (let startedAtMs = windowStartMs; startedAtMs < input.nowMs; startedAtMs += bucketMs) { @@ -220,10 +229,12 @@ export function aggregateProcessResourceHistory(input: { readonly readAtMs: number; readonly windowMs: number; readonly bucketMs: number; - readonly lastError: string | null; + readonly lastError: Option.Option; }): ServerProcessResourceHistoryResult { - const windowMs = Math.max(1_000, input.windowMs); - const bucketMs = Math.max(1_000, input.bucketMs); + const window = Duration.max(MIN_HISTORY_DURATION, Duration.millis(input.windowMs)); + const bucket = Duration.max(MIN_HISTORY_DURATION, Duration.millis(input.bucketMs)); + const windowMs = Duration.toMillis(window); + const bucketMs = Duration.toMillis(bucket); const minSampledAtMs = input.readAtMs - windowMs; const samples = input.samples.filter((sample) => sample.sampledAtMs >= minSampledAtMs); const topProcesses = summarizeProcesses(samples); @@ -239,15 +250,15 @@ export function aggregateProcessResourceHistory(input: { sampleIntervalMs: SAMPLE_INTERVAL_MS, retainedSampleCount: input.samples.length, totalCpuSecondsApprox, - buckets: buildBuckets({ samples, nowMs: input.readAtMs, windowMs, bucketMs }), + buckets: buildBuckets({ samples, nowMs: input.readAtMs, window, bucket }), topProcesses, - error: input.lastError ? Option.some({ message: input.lastError }) : Option.none(), + error: Option.map(input.lastError, (message) => ({ message })), }; } export const make = Effect.fn("makeProcessResourceMonitor")(function* () { const spawner = yield* ChildProcessSpawner.ChildProcessSpawner; - const state = yield* Ref.make({ samples: [], lastError: null }); + const state = yield* Ref.make({ samples: [], lastError: Option.none() }); const sampleOnce = Effect.gen(function* () { const sampledAt = yield* DateTime.now; @@ -263,20 +274,20 @@ export const make = Effect.fn("makeProcessResourceMonitor")(function* () { }); yield* Ref.update(state, (current) => ({ samples: trimSamples([...current.samples, ...samples], sampledAtMs), - lastError: null, + lastError: Option.none(), })); }).pipe( Effect.catch((error: unknown) => Ref.update(state, (current) => ({ ...current, - lastError: error instanceof Error ? error.message : "Failed to sample process resources.", + lastError: Option.some( + error instanceof Error ? error.message : "Failed to sample process resources.", + ), })), ), ); - yield* Effect.forever(sampleOnce.pipe(Effect.andThen(Effect.sleep(SAMPLE_INTERVAL_MS)))).pipe( - Effect.forkScoped, - ); + yield* sampleOnce.pipe(Effect.repeat(Schedule.spaced(SAMPLE_INTERVAL)), Effect.forkScoped); const readHistory: ProcessResourceMonitorShape["readHistory"] = (input) => Effect.gen(function* () { diff --git a/packages/shared/src/KeyedCoalescingWorker.test.ts b/packages/shared/src/KeyedCoalescingWorker.test.ts index a010d732310..1e5274656df 100644 --- a/packages/shared/src/KeyedCoalescingWorker.test.ts +++ b/packages/shared/src/KeyedCoalescingWorker.test.ts @@ -1,5 +1,4 @@ -import { it } from "@effect/vitest"; -import { describe, expect } from "vitest"; +import { assert, describe, it } from "@effect/vitest"; import * as Deferred from "effect/Deferred"; import * as Effect from "effect/Effect"; @@ -47,12 +46,12 @@ describe("makeKeyedCoalescingWorker", () => { yield* Deferred.succeed(releaseFirst, undefined); yield* Deferred.await(secondStarted); - expect(yield* Deferred.isDone(drained)).toBe(false); + assert.equal(yield* Deferred.isDone(drained), false); yield* Deferred.succeed(releaseSecond, undefined); yield* Deferred.await(drained); - expect(processed).toEqual(["terminal-1:first", "terminal-1:second"]); + assert.deepStrictEqual(processed, ["terminal-1:first", "terminal-1:second"]); }), ), ); @@ -90,7 +89,28 @@ describe("makeKeyedCoalescingWorker", () => { yield* Deferred.await(secondProcessed); yield* worker.drainKey("terminal-1"); - expect(processed).toEqual(["terminal-1:first", "terminal-1:second"]); + assert.deepStrictEqual(processed, ["terminal-1:first", "terminal-1:second"]); + }), + ), + ); + + it.live("treats an undefined value as present queued work", () => + Effect.scoped( + Effect.gen(function* () { + const processed: Array = []; + + const worker = yield* makeKeyedCoalescingWorker({ + merge: (_current, next) => next, + process: (_key, value) => + Effect.sync(() => { + processed.push(value); + }), + }); + + yield* worker.enqueue("terminal-1", undefined); + yield* worker.drainKey("terminal-1"); + + assert.deepStrictEqual(processed, [undefined]); }), ), ); diff --git a/packages/shared/src/KeyedCoalescingWorker.ts b/packages/shared/src/KeyedCoalescingWorker.ts index f4edebfafb3..c8b52ce6176 100644 --- a/packages/shared/src/KeyedCoalescingWorker.ts +++ b/packages/shared/src/KeyedCoalescingWorker.ts @@ -9,6 +9,7 @@ */ import * as Scope from "effect/Scope"; import * as Effect from "effect/Effect"; +import * as Option from "effect/Option"; import * as TxQueue from "effect/TxQueue"; import * as TxRef from "effect/TxRef"; @@ -23,6 +24,9 @@ interface KeyedCoalescingWorkerState { readonly activeKeys: Set; } +const getMapValue = (map: ReadonlyMap, key: K): Option.Option => + map.has(key) ? Option.some(map.get(key) as V) : Option.none(); + export const makeKeyedCoalescingWorker = (options: { readonly merge: (current: V, next: V) => V; readonly process: (key: K, value: V) => Effect.Effect; @@ -39,11 +43,11 @@ export const makeKeyedCoalescingWorker = (options: { options.process(key, value).pipe( Effect.flatMap(() => TxRef.modify(stateRef, (state) => { - const nextValue = state.latestByKey.get(key); - if (nextValue === undefined) { + const nextValue = getMapValue(state.latestByKey, key); + if (Option.isNone(nextValue)) { const activeKeys = new Set(state.activeKeys); activeKeys.delete(key); - return [null, { ...state, activeKeys }] as const; + return [Option.none(), { ...state, activeKeys }] as const; } const latestByKey = new Map(state.latestByKey); @@ -52,7 +56,10 @@ export const makeKeyedCoalescingWorker = (options: { }).pipe(Effect.tx), ), Effect.flatMap((nextValue) => - nextValue === null ? Effect.void : processKey(key, nextValue), + Option.match(nextValue, { + onNone: () => Effect.void, + onSome: (value) => processKey(key, value), + }), ), ); @@ -81,9 +88,12 @@ export const makeKeyedCoalescingWorker = (options: { const queuedKeys = new Set(state.queuedKeys); queuedKeys.delete(key); - const value = state.latestByKey.get(key); - if (value === undefined) { - return [null, { ...state, queuedKeys }] as const; + const value = getMapValue(state.latestByKey, key); + if (Option.isNone(value)) { + return [ + Option.none<{ readonly key: K; readonly value: V }>(), + { ...state, queuedKeys }, + ] as const; } const latestByKey = new Map(state.latestByKey); @@ -92,17 +102,17 @@ export const makeKeyedCoalescingWorker = (options: { activeKeys.add(key); return [ - { key, value } as const, + Option.some({ key, value: value.value }), { ...state, latestByKey, queuedKeys, activeKeys }, ] as const; }).pipe(Effect.tx), ), Effect.flatMap((item) => - item === null - ? Effect.void - : processKey(item.key, item.value).pipe( - Effect.catchCause(() => cleanupFailedKey(item.key)), - ), + Option.match(item, { + onNone: () => Effect.void, + onSome: ({ key, value }) => + processKey(key, value).pipe(Effect.catchCause(() => cleanupFailedKey(key))), + }), ), Effect.forever, Effect.forkScoped, @@ -111,8 +121,14 @@ export const makeKeyedCoalescingWorker = (options: { const enqueue: KeyedCoalescingWorker["enqueue"] = (key, value) => TxRef.modify(stateRef, (state) => { const latestByKey = new Map(state.latestByKey); - const existing = latestByKey.get(key); - latestByKey.set(key, existing === undefined ? value : options.merge(existing, value)); + const existing = getMapValue(latestByKey, key); + latestByKey.set( + key, + Option.match(existing, { + onNone: () => value, + onSome: (current) => options.merge(current, value), + }), + ); if (state.queuedKeys.has(key) || state.activeKeys.has(key)) { return [false, { ...state, latestByKey }] as const;