diff --git a/package.json b/package.json index 9d9f902f..7039f1b7 100644 --- a/package.json +++ b/package.json @@ -23,10 +23,12 @@ "main": "src/index.ts", "scripts": { "dev": "node -r ts-node/register src/index.ts", + "dev:dashboard": "node -r ts-node/register src/dashboard-service/index.ts", "clean": "rimraf ./{dist,.nyc_output,.test-reports,.coverage}", "build": "tsc --project tsconfig.build.json", "prestart": "npm run build", "start": "cd dist && node src/index.js", + "start:dashboard": "node dist/src/dashboard-service/index.js", "build:check": "npm run build -- --noEmit", "lint": "eslint --ext .ts ./src ./test", "lint:report": "eslint -o .lint-reports/eslint.json -f json --ext .ts ./src ./test", @@ -37,6 +39,8 @@ "db:seed": "knex seed:run", "pretest:unit": "mkdir -p .test-reports/unit", "test:unit": "mocha 'test/**/*.spec.ts'", + "test:unit:dashboard": "mocha 'test/unit/dashboard-service/**/*.spec.ts'", + "test:e2e:dashboard:ws": "node scripts/check_dashboard_ws_updates.js", "test:unit:watch": "npm run test:unit -- --min --watch --watch-files src/**/*,test/**/*", "cover:unit": "nyc --report-dir .coverage/unit npm run test:unit", "docker:build": "docker build -t nostream .", diff --git a/src/dashboard-service/api/dashboard-router.ts b/src/dashboard-service/api/dashboard-router.ts new file mode 100644 index 00000000..dc977c52 --- /dev/null +++ b/src/dashboard-service/api/dashboard-router.ts @@ -0,0 +1,12 @@ +import { Router } from 'express' + +import { createGetKPISnapshotRequestHandler } from '../handlers/request-handlers/get-kpi-snapshot-request-handler' +import { SnapshotService } from '../services/snapshot-service' + +export const createDashboardRouter = (snapshotService: SnapshotService): Router => { + const router = Router() + + router.get('/snapshot', createGetKPISnapshotRequestHandler(snapshotService)) + + return router +} diff --git a/src/dashboard-service/app.ts b/src/dashboard-service/app.ts new file mode 100644 index 00000000..fd19a5cf --- /dev/null +++ b/src/dashboard-service/app.ts @@ -0,0 +1,116 @@ +import { createDashboardRouter } from './api/dashboard-router' +import { createLogger } from '../factories/logger-factory' +import { DashboardServiceConfig } from './config' +import { DashboardWebSocketHub } from './ws/dashboard-ws-hub' +import express from 'express' +import { getHealthRequestHandler } from './handlers/request-handlers/get-health-request-handler' +import http from 'http' +import { PollingScheduler } from './polling/polling-scheduler' +import { SnapshotService } from './services/snapshot-service' +import { WebSocketServer } from 'ws' + +const debug = createLogger('dashboard-service:app') + +export interface DashboardService { + readonly config: DashboardServiceConfig + readonly snapshotService: SnapshotService + readonly pollingScheduler: PollingScheduler + start(): Promise + stop(): Promise + getHttpPort(): number +} + +export const createDashboardService = (config: DashboardServiceConfig): DashboardService => { + console.info( + 'dashboard-service: creating service (host=%s, port=%d, wsPath=%s, pollIntervalMs=%d)', + config.host, + config.port, + config.wsPath, + config.pollIntervalMs, + ) + + const snapshotService = new SnapshotService() + + const app = express() + .disable('x-powered-by') + .get('/healthz', getHealthRequestHandler) + .use('/api/v1/kpis', createDashboardRouter(snapshotService)) + + const webServer = http.createServer(app) + const webSocketServer = new WebSocketServer({ + server: webServer, + path: config.wsPath, + }) + + const webSocketHub = new DashboardWebSocketHub(webSocketServer, () => snapshotService.getSnapshot()) + + const pollingScheduler = new PollingScheduler(config.pollIntervalMs, () => { + const nextSnapshot = snapshotService.refreshPlaceholder() + debug('poll tick produced snapshot sequence=%d', nextSnapshot.sequence) + webSocketHub.broadcastTick(nextSnapshot.sequence) + webSocketHub.broadcastSnapshot(nextSnapshot) + }) + + const start = async () => { + if (webServer.listening) { + debug('start requested but service is already listening') + return + } + + console.info('dashboard-service: starting http and websocket servers') + + await new Promise((resolve, reject) => { + webServer.listen(config.port, config.host, () => { + const address = webServer.address() + debug('listening on %o', address) + console.info('dashboard-service: listening on %o', address) + resolve() + }) + webServer.once('error', (error) => { + console.error('dashboard-service: failed to start server', error) + reject(error) + }) + }) + + pollingScheduler.start() + console.info('dashboard-service: polling scheduler started') + } + + const stop = async () => { + console.info('dashboard-service: stopping service') + pollingScheduler.stop() + webSocketHub.close() + await new Promise((resolve, reject) => { + if (!webServer.listening) { + debug('stop requested while server was already stopped') + resolve() + return + } + + webServer.close((error) => { + if (error) { + console.error('dashboard-service: failed to stop cleanly', error) + reject(error) + return + } + + console.info('dashboard-service: http server closed') + resolve() + }) + }) + } + + const getHttpPort = (): number => { + const address = webServer.address() + return typeof address === 'object' && address !== null ? address.port : config.port + } + + return { + config, + snapshotService, + pollingScheduler, + start, + stop, + getHttpPort, + } +} diff --git a/src/dashboard-service/config.ts b/src/dashboard-service/config.ts new file mode 100644 index 00000000..3d888ae8 --- /dev/null +++ b/src/dashboard-service/config.ts @@ -0,0 +1,28 @@ +export interface DashboardServiceConfig { + host: string + port: number + wsPath: string + pollIntervalMs: number +} + +const parseInteger = (value: string | undefined, fallback: number): number => { + if (typeof value === 'undefined' || value === '') { + return fallback + } + + const parsed = Number(value) + if (!Number.isInteger(parsed) || parsed < 0) { + return fallback + } + + return parsed +} + +export const getDashboardServiceConfig = (): DashboardServiceConfig => { + return { + host: process.env.DASHBOARD_SERVICE_HOST ?? '127.0.0.1', + port: parseInteger(process.env.DASHBOARD_SERVICE_PORT, 8011), + wsPath: process.env.DASHBOARD_WS_PATH ?? '/api/v1/kpis/stream', + pollIntervalMs: parseInteger(process.env.DASHBOARD_POLL_INTERVAL_MS, 5000), + } +} diff --git a/src/dashboard-service/controllers/get-health-controller.ts b/src/dashboard-service/controllers/get-health-controller.ts new file mode 100644 index 00000000..0c7037a3 --- /dev/null +++ b/src/dashboard-service/controllers/get-health-controller.ts @@ -0,0 +1,12 @@ +import { Request, Response } from 'express' + +import { IController } from '../../@types/controllers' + +export class GetHealthController implements IController { + public async handleRequest(_request: Request, response: Response): Promise { + response + .status(200) + .setHeader('content-type', 'application/json; charset=utf-8') + .send({ status: 'ok' }) + } +} \ No newline at end of file diff --git a/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts b/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts new file mode 100644 index 00000000..741d9902 --- /dev/null +++ b/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts @@ -0,0 +1,20 @@ +import { Request, Response } from 'express' + +import { DashboardSnapshotResponse } from '../types' +import { IController } from '../../@types/controllers' +import { SnapshotService } from '../services/snapshot-service' + +export class GetKPISnapshotController implements IController { + public constructor(private readonly snapshotService: SnapshotService) { } + + public async handleRequest(_request: Request, response: Response): Promise { + const payload: DashboardSnapshotResponse = { + data: this.snapshotService.getSnapshot(), + } + + response + .status(200) + .setHeader('content-type', 'application/json; charset=utf-8') + .send(payload) + } +} \ No newline at end of file diff --git a/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts b/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts new file mode 100644 index 00000000..e8f2cea4 --- /dev/null +++ b/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts @@ -0,0 +1,5 @@ +import { withController } from '../../../handlers/request-handlers/with-controller-request-handler' + +import { GetHealthController } from '../../controllers/get-health-controller' + +export const getHealthRequestHandler = withController(() => new GetHealthController()) \ No newline at end of file diff --git a/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts b/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts new file mode 100644 index 00000000..507e181b --- /dev/null +++ b/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts @@ -0,0 +1,8 @@ +import { withController } from '../../../handlers/request-handlers/with-controller-request-handler' + +import { GetKPISnapshotController } from '../../controllers/get-kpi-snapshot-controller' +import { SnapshotService } from '../../services/snapshot-service' + +export const createGetKPISnapshotRequestHandler = (snapshotService: SnapshotService) => { + return withController(() => new GetKPISnapshotController(snapshotService)) +} \ No newline at end of file diff --git a/src/dashboard-service/index.ts b/src/dashboard-service/index.ts new file mode 100644 index 00000000..4f64161c --- /dev/null +++ b/src/dashboard-service/index.ts @@ -0,0 +1,42 @@ +import { createLogger } from '../factories/logger-factory' + +import { createDashboardService } from './app' +import { getDashboardServiceConfig } from './config' + +const debug = createLogger('dashboard-service:index') + +const run = async () => { + const config = getDashboardServiceConfig() + console.info('dashboard-service: bootstrapping with config %o', config) + const service = createDashboardService(config) + + const shutdown = async () => { + console.info('dashboard-service: received shutdown signal') + debug('received shutdown signal') + await service.stop() + process.exit(0) + } + + process + .on('SIGINT', shutdown) + .on('SIGTERM', shutdown) + + process.on('uncaughtException', (error) => { + console.error('dashboard-service: uncaught exception', error) + }) + + process.on('unhandledRejection', (error) => { + console.error('dashboard-service: unhandled rejection', error) + }) + + await service.start() +} + +if (require.main === module) { + run().catch((error) => { + console.error('dashboard-service: unable to start', error) + process.exit(1) + }) +} + +export { run } diff --git a/src/dashboard-service/polling/polling-scheduler.ts b/src/dashboard-service/polling/polling-scheduler.ts new file mode 100644 index 00000000..c24898d9 --- /dev/null +++ b/src/dashboard-service/polling/polling-scheduler.ts @@ -0,0 +1,43 @@ +import { createLogger } from '../../factories/logger-factory' + +type Tick = () => Promise | void + +const debug = createLogger('dashboard-service:polling') + +export class PollingScheduler { + private timer: NodeJS.Timer | undefined + + public constructor( + private readonly intervalMs: number, + private readonly tick: Tick, + ) { } + + public start(): void { + if (this.timer) { + return + } + + debug('starting scheduler with interval %d ms', this.intervalMs) + + this.timer = setInterval(() => { + Promise.resolve(this.tick()) + .catch((error) => { + console.error('dashboard-service: polling tick failed', error) + }) + }, this.intervalMs) + } + + public stop(): void { + if (!this.timer) { + return + } + + debug('stopping scheduler') + clearInterval(this.timer) + this.timer = undefined + } + + public isRunning(): boolean { + return Boolean(this.timer) + } +} diff --git a/src/dashboard-service/services/kpi-collector-service.ts b/src/dashboard-service/services/kpi-collector-service.ts new file mode 100644 index 00000000..51139733 --- /dev/null +++ b/src/dashboard-service/services/kpi-collector-service.ts @@ -0,0 +1,134 @@ +import { DashboardMetrics, EventsByKindCount, TopTalker } from '../types' +import { createLogger } from '../../factories/logger-factory' +import { DatabaseClient } from '../../@types/base' + +const debug = createLogger('dashboard-service:kpi-collector') + +const DEFAULT_TRACKED_KINDS = [7, 1, 6, 1984, 4, 3, 9735] + +const toNumber = (value: unknown): number => { + if (typeof value === 'number') { + return value + } + + if (typeof value === 'string' && value !== '') { + return Number(value) + } + + return 0 +} + +export class KPICollectorService { + public constructor( + private readonly dbClient: DatabaseClient, + private readonly trackedKinds: number[] = DEFAULT_TRACKED_KINDS, + private readonly topTalkersLimit = 10, + private readonly recentDays = 3, + ) { } + + public async collectMetrics(): Promise { + debug('collecting dashboard metrics') + + const [ + eventsByKind, + admittedUsers, + satsPaid, + allTimeTopTalkers, + recentTopTalkers, + ] = await Promise.all([ + this.getEventsByKind(), + this.getAdmittedUsersCount(), + this.getSatsPaidCount(), + this.getTopTalkersAllTime(), + this.getTopTalkersRecent(), + ]) + + return { + eventsByKind, + admittedUsers, + satsPaid, + topTalkers: { + allTime: allTimeTopTalkers, + recent: recentTopTalkers, + }, + } + } + + private async getEventsByKind(): Promise { + const rows = await this.dbClient('events') + .select('event_kind') + .count('id as count') + .whereIn('event_kind', this.trackedKinds) + .groupBy('event_kind') + .orderBy('count', 'desc') as Array<{ event_kind: number, count: string }> + + const other = await this.dbClient('events') + .whereNotIn('event_kind', this.trackedKinds) + .count<{ count: string }>('id as count') + .first() + + const eventsByKind = rows.map((row) => { + return { + kind: String(row.event_kind), + count: toNumber(row.count), + } + }) + + eventsByKind.push({ + kind: 'other', + count: toNumber(other?.count), + }) + + return eventsByKind + } + + private async getAdmittedUsersCount(): Promise { + const result = await this.dbClient('users') + .where('is_admitted', true) + .count<{ count: string }>('pubkey as count') + .first() + + return toNumber(result?.count) + } + + private async getSatsPaidCount(): Promise { + const result = await this.dbClient('users') + .where('is_admitted', true) + .sum<{ total: string | null }>('balance as total') + .first() + + const millisats = toNumber(result?.total) + return millisats / 1000 + } + + private async getTopTalkersAllTime(): Promise { + const rows = await this.dbClient('events') + .select(this.dbClient.raw("encode(event_pubkey, 'hex') as pubkey")) + .count('id as count') + .groupBy('event_pubkey') + .orderBy('count', 'desc') + .limit(this.topTalkersLimit) as unknown as Array<{ pubkey: string | Buffer, count: string | number }> + + return rows.map((row) => ({ + pubkey: String(row.pubkey), + count: toNumber(row.count), + })) + } + + private async getTopTalkersRecent(): Promise { + const since = new Date(Date.now() - this.recentDays * 24 * 60 * 60 * 1000) + + const rows = await this.dbClient('events') + .select(this.dbClient.raw("encode(event_pubkey, 'hex') as pubkey")) + .count('id as count') + .where('first_seen', '>=', since) + .groupBy('event_pubkey') + .orderBy('count', 'desc') + .limit(this.topTalkersLimit) as unknown as Array<{ pubkey: string | Buffer, count: string | number }> + + return rows.map((row) => ({ + pubkey: String(row.pubkey), + count: toNumber(row.count), + })) + } +} diff --git a/src/dashboard-service/services/snapshot-service.ts b/src/dashboard-service/services/snapshot-service.ts new file mode 100644 index 00000000..b1a511fe --- /dev/null +++ b/src/dashboard-service/services/snapshot-service.ts @@ -0,0 +1,34 @@ +import { KPISnapshot } from '../types' + +export class SnapshotService { + private sequence = 0 + + private snapshot: KPISnapshot = { + sequence: this.sequence, + generatedAt: new Date(0).toISOString(), + status: 'placeholder', + metrics: { + eventsByKind: [], + admittedUsers: null, + satsPaid: null, + topTalkers: [], + }, + } + + public getSnapshot(): KPISnapshot { + return this.snapshot + } + + // Phase 1 placeholder: advances sequence/time so polling and websocket flow can be validated end-to-end. + public refreshPlaceholder(): KPISnapshot { + this.sequence += 1 + + this.snapshot = { + ...this.snapshot, + sequence: this.sequence, + generatedAt: new Date().toISOString(), + } + + return this.snapshot + } +} diff --git a/src/dashboard-service/types.ts b/src/dashboard-service/types.ts new file mode 100644 index 00000000..02885a36 --- /dev/null +++ b/src/dashboard-service/types.ts @@ -0,0 +1,30 @@ +export interface TopTalker { + pubkey: string + count: number +} + +export interface KPISnapshot { + sequence: number + generatedAt: string + status: 'placeholder' + metrics: { + eventsByKind: Array<{ kind: string, count: number }> + admittedUsers: number | null + satsPaid: number | null + topTalkers: TopTalker[] + } +} + +export interface DashboardSnapshotResponse { + data: KPISnapshot +} + +export interface DashboardWebSocketEnvelope { + type: TType + payload: TPayload +} + +export type DashboardServerMessage = + | DashboardWebSocketEnvelope<'dashboard.connected', { at: string }> + | DashboardWebSocketEnvelope<'kpi.snapshot', KPISnapshot> + | DashboardWebSocketEnvelope<'kpi.tick', { at: string, sequence: number }> diff --git a/src/dashboard-service/ws/dashboard-ws-hub.ts b/src/dashboard-service/ws/dashboard-ws-hub.ts new file mode 100644 index 00000000..1b3b330d --- /dev/null +++ b/src/dashboard-service/ws/dashboard-ws-hub.ts @@ -0,0 +1,141 @@ +import { DashboardServerMessage, KPISnapshot } from '../types' +import { RawData, WebSocketServer } from 'ws' +import { createLogger } from '../../factories/logger-factory' +import WebSocket from 'ws' + +const debug = createLogger('dashboard-service:ws') + +export class DashboardWebSocketHub { + public constructor( + private readonly webSocketServer: WebSocketServer, + private readonly getSnapshot: () => KPISnapshot, + ) { + console.info('dashboard-service: websocket hub initialized') + + this.webSocketServer + .on('connection', this.onConnection.bind(this)) + .on('close', () => { + console.info('dashboard-service: websocket server closed') + }) + .on('error', (error) => { + console.error('dashboard-service: websocket server error', error) + }) + } + + public broadcastSnapshot(snapshot: KPISnapshot): void { + this.broadcast({ + type: 'kpi.snapshot', + payload: snapshot, + }) + } + + public broadcastTick(sequence: number): void { + this.broadcast({ + type: 'kpi.tick', + payload: { + at: new Date().toISOString(), + sequence, + }, + }) + } + + public close(): void { + console.info('dashboard-service: closing websocket hub') + this.webSocketServer.clients.forEach((client) => { + client.close() + }) + this.webSocketServer.removeAllListeners() + } + + private onConnection(client: WebSocket): void { + const connectedClients = this.getConnectedClientsCount() + console.info('dashboard-service: websocket client connected (clients=%d)', connectedClients) + + client + .on('close', (code, reason) => { + console.info( + 'dashboard-service: websocket client disconnected (code=%d, reason=%s, clients=%d)', + code, + reason.toString(), + this.getConnectedClientsCount(), + ) + }) + .on('error', (error) => { + console.error('dashboard-service: websocket client error', error) + }) + .on('message', (raw) => { + this.onClientMessage(raw) + }) + + this.send(client, { + type: 'dashboard.connected', + payload: { + at: new Date().toISOString(), + }, + }) + + this.send(client, { + type: 'kpi.snapshot', + payload: this.getSnapshot(), + }) + + debug('dashboard websocket bootstrap snapshot sent') + } + + private onClientMessage(raw: RawData): void { + try { + const rawMessage = this.toUtf8(raw) + const message = JSON.parse(rawMessage) + debug('dashboard websocket client message received: %o', message) + } catch (error) { + console.error('dashboard-service: websocket message parsing failed', error) + } + } + + private broadcast(message: DashboardServerMessage): void { + this.webSocketServer.clients.forEach((client) => { + if (client.readyState !== WebSocket.OPEN) { + return + } + this.send(client, message) + }) + } + + private send(client: WebSocket, message: DashboardServerMessage): void { + if (client.readyState !== WebSocket.OPEN) { + return + } + + try { + client.send(JSON.stringify(message)) + } catch (error) { + console.error('dashboard-service: websocket send failed', error) + } + } + + private toUtf8(raw: RawData): string { + if (typeof raw === 'string') { + return raw + } + + if (Buffer.isBuffer(raw)) { + return raw.toString('utf8') + } + + if (Array.isArray(raw)) { + return raw.map((chunk) => { + if (Buffer.isBuffer(chunk)) { + return chunk.toString('utf8') + } + + return Buffer.from(chunk as ArrayBuffer).toString('utf8') + }).join('') + } + + return Buffer.from(raw).toString('utf8') + } + + private getConnectedClientsCount(): number { + return Array.from(this.webSocketServer.clients).filter((client) => client.readyState === WebSocket.OPEN).length + } +} diff --git a/test/unit/dashboard-service/app.spec.ts b/test/unit/dashboard-service/app.spec.ts new file mode 100644 index 00000000..67a092d6 --- /dev/null +++ b/test/unit/dashboard-service/app.spec.ts @@ -0,0 +1,43 @@ +import axios from 'axios' +import { createDashboardService } from '../../../src/dashboard-service/app' +import { expect } from 'chai' +import WebSocket from 'ws' + +describe('dashboard-service app', () => { + it('serves health, snapshot, and websocket endpoints', async () => { + const service = createDashboardService({ + host: '127.0.0.1', + port: 0, + wsPath: '/api/v1/kpis/stream', + pollIntervalMs: 1000, + }) + + await service.start() + + const port = service.getHttpPort() + + const healthResponse = await axios.get(`http://127.0.0.1:${port}/healthz`) + expect(healthResponse.status).to.equal(200) + + const snapshotResponse = await axios.get(`http://127.0.0.1:${port}/api/v1/kpis/snapshot`) + expect(snapshotResponse.status).to.equal(200) + + const snapshotJson = snapshotResponse.data as any + expect(snapshotJson.data).to.have.property('sequence') + + const ws = new WebSocket(`ws://127.0.0.1:${port}/api/v1/kpis/stream`) + + const connectedEvent = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('timeout waiting for ws message')), 2000) + ws.once('message', (raw) => { + clearTimeout(timeout) + resolve(JSON.parse(raw.toString())) + }) + }) + + expect(connectedEvent).to.have.property('type', 'dashboard.connected') + + ws.close() + await service.stop() + }) +}) diff --git a/test/unit/dashboard-service/polling-scheduler.spec.ts b/test/unit/dashboard-service/polling-scheduler.spec.ts new file mode 100644 index 00000000..28d1c929 --- /dev/null +++ b/test/unit/dashboard-service/polling-scheduler.spec.ts @@ -0,0 +1,39 @@ +import { expect } from 'chai' +import Sinon from 'sinon' + +import { PollingScheduler } from '../../../src/dashboard-service/polling/polling-scheduler' + +describe('PollingScheduler', () => { + let clock: Sinon.SinonFakeTimers + + beforeEach(() => { + clock = Sinon.useFakeTimers() + }) + + afterEach(() => { + clock.restore() + }) + + it('runs tick callback on interval while running', async () => { + const tick = Sinon.stub().resolves(undefined) + const scheduler = new PollingScheduler(1000, tick) + + scheduler.start() + await clock.tickAsync(3000) + + expect(tick.callCount).to.equal(3) + scheduler.stop() + }) + + it('stops running when stop is called', async () => { + const tick = Sinon.stub().resolves(undefined) + const scheduler = new PollingScheduler(500, tick) + + scheduler.start() + await clock.tickAsync(1000) + scheduler.stop() + await clock.tickAsync(1000) + + expect(tick.callCount).to.equal(2) + }) +})