From 057634fba8ae8b02874b732644f83ab41b8b0a2e Mon Sep 17 00:00:00 2001 From: Kushagra Date: Thu, 9 Apr 2026 04:52:52 +0530 Subject: [PATCH 01/12] fix: added expired_at filter to message pipeline --- src/handlers/subscribe-message-handler.ts | 10 ++- .../subscribe-message-handler.spec.ts | 61 ++++++++++++++++++- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/src/handlers/subscribe-message-handler.ts b/src/handlers/subscribe-message-handler.ts index 1ab4e322..87fbde95 100644 --- a/src/handlers/subscribe-message-handler.ts +++ b/src/handlers/subscribe-message-handler.ts @@ -4,7 +4,7 @@ import { pipeline } from 'stream/promises' import { createEndOfStoredEventsNoticeMessage, createNoticeMessage, createOutgoingEventMessage } from '../utils/messages' import { IAbortable, IMessageHandler } from '../@types/message-handlers' -import { isEventMatchingFilter, toNostrEvent } from '../utils/event' +import { isEventMatchingFilter, isExpiredEvent, toNostrEvent } from '../utils/event' import { streamEach, streamEnd, streamFilter, streamMap } from '../utils/stream' import { SubscriptionFilter, SubscriptionId } from '../@types/subscription' import { createLogger } from '../factories/logger-factory' @@ -55,6 +55,12 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable { const sendEOSE = () => this.webSocket.emit(WebSocketAdapterEvent.Message, createEndOfStoredEventsNoticeMessage(subscriptionId)) const isSubscribedToEvent = SubscribeMessageHandler.isClientSubscribedToEvent(filters) + const isNotExpired = (event: Event)=>{ + if (isExpiredEvent(event)) { + return false + } + return true + } const findEvents = this.eventRepository.findByFilters(filters).stream() @@ -65,6 +71,7 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable { findEvents, streamFilter(propSatisfies(isNil, 'deleted_at')), streamMap(toNostrEvent), + streamFilter(isNotExpired), streamFilter(isSubscribedToEvent), streamEach(sendEvent), streamEnd(sendEOSE), @@ -117,3 +124,4 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable { } } + diff --git a/test/unit/handlers/subscribe-message-handler.spec.ts b/test/unit/handlers/subscribe-message-handler.spec.ts index adcdbb5a..c269a75e 100644 --- a/test/unit/handlers/subscribe-message-handler.spec.ts +++ b/test/unit/handlers/subscribe-message-handler.spec.ts @@ -3,6 +3,7 @@ import chai from 'chai' import chaiAsPromised from 'chai-as-promised' import EventEmitter from 'events' import Sinon from 'sinon' +import sinonChai from 'sinon-chai' import { IAbortable, IMessageHandler } from '../../../src/@types/message-handlers' import { MessageType, SubscribeMessage } from '../../../src/@types/messages' @@ -14,10 +15,14 @@ import { PassThrough } from 'stream' import { SubscribeMessageHandler } from '../../../src/handlers/subscribe-message-handler' import { WebSocketAdapterEvent } from '../../../src/constants/adapter' +chai.use(sinonChai) chai.use(chaiAsPromised) const { expect } = chai -const toDbEvent = (event: Event) => ({ +const toDbEvent = ( + event: Event, + metadata: { expires_at?: number, deleted_at?: Date | null } = {}, +) => ({ event_id: Buffer.from(event.id, 'hex'), event_kind: event.kind, event_pubkey: Buffer.from(event.pubkey, 'hex'), @@ -25,6 +30,7 @@ const toDbEvent = (event: Event) => ({ event_content: event.content, event_tags: event.tags, event_signature: Buffer.from(event.sig, 'hex'), + ...metadata, }) describe('SubscribeMessageHandler', () => { @@ -112,11 +118,13 @@ describe('SubscribeMessageHandler', () => { describe('#fetchAndSend', () => { let event: Event + let clock: Sinon.SinonFakeTimers let webSocketOnMessageStub: Sinon.SinonStub let webSocketOnSubscribeStub: Sinon.SinonStub let isClientSubscribedToEventStub: Sinon.SinonStub beforeEach(() => { + clock = Sinon.useFakeTimers(1665546189000) event = { 'id': 'b1601d26958e6508b7b9df0af609c652346c09392b6534d93aead9819a51b4ef', 'pubkey': '22e804d26ed16b68db5259e78449e96dab5d464c8f470bda3eb1a70467f2c793', @@ -136,6 +144,10 @@ describe('SubscribeMessageHandler', () => { //streamEndSpy = sandbox.spy(Stream, '_end' as any) }) + afterEach(() => { + clock.restore() + }) + it('does not send event if client is not subscribed to it', async () => { isClientSubscribedToEventStub.returns(always(false)) @@ -165,6 +177,53 @@ describe('SubscribeMessageHandler', () => { ) }) + it('does not send expired events', async () => { + isClientSubscribedToEventStub.returns(always(true)) + + const now = Math.floor(clock.now / 1000) + const promise = (handler as any).fetchAndSend(subscriptionId, filters) + + const expiredEvent: Event = { + ...event, + tags: [['expiration', String(now - 1)] as any], + } + + stream.write(toDbEvent(expiredEvent)) + stream.end() + + await promise + + expect(eventRepositoryFindByFiltersStub).to.have.been.calledOnceWithExactly(filters) + expect(webSocketOnMessageStub).to.have.been.calledOnceWithExactly( + ['EOSE', subscriptionId], + ) + }) + + it('sends event if expiration is in the future', async () => { + isClientSubscribedToEventStub.returns(always(true)) + + const now = Math.floor(clock.now / 1000) + const promise = (handler as any).fetchAndSend(subscriptionId, filters) + + const eventWithFutureExpiration: Event = { + ...event, + tags: [['expiration', String(now + 60)] as any], + } + + stream.write(toDbEvent(eventWithFutureExpiration)) + stream.end() + + await promise + + expect(eventRepositoryFindByFiltersStub).to.have.been.calledOnceWithExactly(filters) + expect(webSocketOnMessageStub).to.have.been.calledWithExactly( + ['EVENT', subscriptionId, eventWithFutureExpiration], + ) + expect(webSocketOnMessageStub).to.have.been.calledWithExactly( + ['EOSE', subscriptionId], + ) + }) + it('sends EOSE', async () => { const promise = (handler as any).fetchAndSend(subscriptionId, filters) From 8d1771baa64799ab97bbf2bc0b499dcd2a1fd1ef Mon Sep 17 00:00:00 2001 From: Kushagra Date: Fri, 10 Apr 2026 14:41:07 +0530 Subject: [PATCH 02/12] feat(dashboard): Implemented basic snapshot service, polling service and ws connection management --- .../polling/polling-scheduler.ts | 43 ++++++ .../services/snapshot-service.ts | 34 +++++ src/dashboard-service/ws/dashboard-ws-hub.ts | 141 ++++++++++++++++++ 3 files changed, 218 insertions(+) create mode 100644 src/dashboard-service/polling/polling-scheduler.ts create mode 100644 src/dashboard-service/services/snapshot-service.ts create mode 100644 src/dashboard-service/ws/dashboard-ws-hub.ts diff --git a/src/dashboard-service/polling/polling-scheduler.ts b/src/dashboard-service/polling/polling-scheduler.ts new file mode 100644 index 00000000..c24898d9 --- /dev/null +++ b/src/dashboard-service/polling/polling-scheduler.ts @@ -0,0 +1,43 @@ +import { createLogger } from '../../factories/logger-factory' + +type Tick = () => Promise | void + +const debug = createLogger('dashboard-service:polling') + +export class PollingScheduler { + private timer: NodeJS.Timer | undefined + + public constructor( + private readonly intervalMs: number, + private readonly tick: Tick, + ) { } + + public start(): void { + if (this.timer) { + return + } + + debug('starting scheduler with interval %d ms', this.intervalMs) + + this.timer = setInterval(() => { + Promise.resolve(this.tick()) + .catch((error) => { + console.error('dashboard-service: polling tick failed', error) + }) + }, this.intervalMs) + } + + public stop(): void { + if (!this.timer) { + return + } + + debug('stopping scheduler') + clearInterval(this.timer) + this.timer = undefined + } + + public isRunning(): boolean { + return Boolean(this.timer) + } +} diff --git a/src/dashboard-service/services/snapshot-service.ts b/src/dashboard-service/services/snapshot-service.ts new file mode 100644 index 00000000..b1a511fe --- /dev/null +++ b/src/dashboard-service/services/snapshot-service.ts @@ -0,0 +1,34 @@ +import { KPISnapshot } from '../types' + +export class SnapshotService { + private sequence = 0 + + private snapshot: KPISnapshot = { + sequence: this.sequence, + generatedAt: new Date(0).toISOString(), + status: 'placeholder', + metrics: { + eventsByKind: [], + admittedUsers: null, + satsPaid: null, + topTalkers: [], + }, + } + + public getSnapshot(): KPISnapshot { + return this.snapshot + } + + // Phase 1 placeholder: advances sequence/time so polling and websocket flow can be validated end-to-end. + public refreshPlaceholder(): KPISnapshot { + this.sequence += 1 + + this.snapshot = { + ...this.snapshot, + sequence: this.sequence, + generatedAt: new Date().toISOString(), + } + + return this.snapshot + } +} diff --git a/src/dashboard-service/ws/dashboard-ws-hub.ts b/src/dashboard-service/ws/dashboard-ws-hub.ts new file mode 100644 index 00000000..1b3b330d --- /dev/null +++ b/src/dashboard-service/ws/dashboard-ws-hub.ts @@ -0,0 +1,141 @@ +import { DashboardServerMessage, KPISnapshot } from '../types' +import { RawData, WebSocketServer } from 'ws' +import { createLogger } from '../../factories/logger-factory' +import WebSocket from 'ws' + +const debug = createLogger('dashboard-service:ws') + +export class DashboardWebSocketHub { + public constructor( + private readonly webSocketServer: WebSocketServer, + private readonly getSnapshot: () => KPISnapshot, + ) { + console.info('dashboard-service: websocket hub initialized') + + this.webSocketServer + .on('connection', this.onConnection.bind(this)) + .on('close', () => { + console.info('dashboard-service: websocket server closed') + }) + .on('error', (error) => { + console.error('dashboard-service: websocket server error', error) + }) + } + + public broadcastSnapshot(snapshot: KPISnapshot): void { + this.broadcast({ + type: 'kpi.snapshot', + payload: snapshot, + }) + } + + public broadcastTick(sequence: number): void { + this.broadcast({ + type: 'kpi.tick', + payload: { + at: new Date().toISOString(), + sequence, + }, + }) + } + + public close(): void { + console.info('dashboard-service: closing websocket hub') + this.webSocketServer.clients.forEach((client) => { + client.close() + }) + this.webSocketServer.removeAllListeners() + } + + private onConnection(client: WebSocket): void { + const connectedClients = this.getConnectedClientsCount() + console.info('dashboard-service: websocket client connected (clients=%d)', connectedClients) + + client + .on('close', (code, reason) => { + console.info( + 'dashboard-service: websocket client disconnected (code=%d, reason=%s, clients=%d)', + code, + reason.toString(), + this.getConnectedClientsCount(), + ) + }) + .on('error', (error) => { + console.error('dashboard-service: websocket client error', error) + }) + .on('message', (raw) => { + this.onClientMessage(raw) + }) + + this.send(client, { + type: 'dashboard.connected', + payload: { + at: new Date().toISOString(), + }, + }) + + this.send(client, { + type: 'kpi.snapshot', + payload: this.getSnapshot(), + }) + + debug('dashboard websocket bootstrap snapshot sent') + } + + private onClientMessage(raw: RawData): void { + try { + const rawMessage = this.toUtf8(raw) + const message = JSON.parse(rawMessage) + debug('dashboard websocket client message received: %o', message) + } catch (error) { + console.error('dashboard-service: websocket message parsing failed', error) + } + } + + private broadcast(message: DashboardServerMessage): void { + this.webSocketServer.clients.forEach((client) => { + if (client.readyState !== WebSocket.OPEN) { + return + } + this.send(client, message) + }) + } + + private send(client: WebSocket, message: DashboardServerMessage): void { + if (client.readyState !== WebSocket.OPEN) { + return + } + + try { + client.send(JSON.stringify(message)) + } catch (error) { + console.error('dashboard-service: websocket send failed', error) + } + } + + private toUtf8(raw: RawData): string { + if (typeof raw === 'string') { + return raw + } + + if (Buffer.isBuffer(raw)) { + return raw.toString('utf8') + } + + if (Array.isArray(raw)) { + return raw.map((chunk) => { + if (Buffer.isBuffer(chunk)) { + return chunk.toString('utf8') + } + + return Buffer.from(chunk as ArrayBuffer).toString('utf8') + }).join('') + } + + return Buffer.from(raw).toString('utf8') + } + + private getConnectedClientsCount(): number { + return Array.from(this.webSocketServer.clients).filter((client) => client.readyState === WebSocket.OPEN).length + } +} From 4e8c91660803cff295a1009dde9b0ce9af002158 Mon Sep 17 00:00:00 2001 From: Kushagra Date: Fri, 10 Apr 2026 14:42:47 +0530 Subject: [PATCH 03/12] feat(dashboard): Implemented basic message types, handlers and API endpoints --- src/dashboard-service/api/dashboard-router.ts | 12 ++++++++ .../controllers/get-health-controller.ts | 12 ++++++++ .../get-kpi-snapshot-controller.ts | 20 +++++++++++++ .../get-health-request-handler.ts | 5 ++++ .../get-kpi-snapshot-request-handler.ts | 8 +++++ src/dashboard-service/types.ts | 30 +++++++++++++++++++ 6 files changed, 87 insertions(+) create mode 100644 src/dashboard-service/api/dashboard-router.ts create mode 100644 src/dashboard-service/controllers/get-health-controller.ts create mode 100644 src/dashboard-service/controllers/get-kpi-snapshot-controller.ts create mode 100644 src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts create mode 100644 src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts create mode 100644 src/dashboard-service/types.ts diff --git a/src/dashboard-service/api/dashboard-router.ts b/src/dashboard-service/api/dashboard-router.ts new file mode 100644 index 00000000..dc977c52 --- /dev/null +++ b/src/dashboard-service/api/dashboard-router.ts @@ -0,0 +1,12 @@ +import { Router } from 'express' + +import { createGetKPISnapshotRequestHandler } from '../handlers/request-handlers/get-kpi-snapshot-request-handler' +import { SnapshotService } from '../services/snapshot-service' + +export const createDashboardRouter = (snapshotService: SnapshotService): Router => { + const router = Router() + + router.get('/snapshot', createGetKPISnapshotRequestHandler(snapshotService)) + + return router +} diff --git a/src/dashboard-service/controllers/get-health-controller.ts b/src/dashboard-service/controllers/get-health-controller.ts new file mode 100644 index 00000000..0c7037a3 --- /dev/null +++ b/src/dashboard-service/controllers/get-health-controller.ts @@ -0,0 +1,12 @@ +import { Request, Response } from 'express' + +import { IController } from '../../@types/controllers' + +export class GetHealthController implements IController { + public async handleRequest(_request: Request, response: Response): Promise { + response + .status(200) + .setHeader('content-type', 'application/json; charset=utf-8') + .send({ status: 'ok' }) + } +} \ No newline at end of file diff --git a/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts b/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts new file mode 100644 index 00000000..741d9902 --- /dev/null +++ b/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts @@ -0,0 +1,20 @@ +import { Request, Response } from 'express' + +import { DashboardSnapshotResponse } from '../types' +import { IController } from '../../@types/controllers' +import { SnapshotService } from '../services/snapshot-service' + +export class GetKPISnapshotController implements IController { + public constructor(private readonly snapshotService: SnapshotService) { } + + public async handleRequest(_request: Request, response: Response): Promise { + const payload: DashboardSnapshotResponse = { + data: this.snapshotService.getSnapshot(), + } + + response + .status(200) + .setHeader('content-type', 'application/json; charset=utf-8') + .send(payload) + } +} \ No newline at end of file diff --git a/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts b/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts new file mode 100644 index 00000000..e8f2cea4 --- /dev/null +++ b/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts @@ -0,0 +1,5 @@ +import { withController } from '../../../handlers/request-handlers/with-controller-request-handler' + +import { GetHealthController } from '../../controllers/get-health-controller' + +export const getHealthRequestHandler = withController(() => new GetHealthController()) \ No newline at end of file diff --git a/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts b/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts new file mode 100644 index 00000000..507e181b --- /dev/null +++ b/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts @@ -0,0 +1,8 @@ +import { withController } from '../../../handlers/request-handlers/with-controller-request-handler' + +import { GetKPISnapshotController } from '../../controllers/get-kpi-snapshot-controller' +import { SnapshotService } from '../../services/snapshot-service' + +export const createGetKPISnapshotRequestHandler = (snapshotService: SnapshotService) => { + return withController(() => new GetKPISnapshotController(snapshotService)) +} \ No newline at end of file diff --git a/src/dashboard-service/types.ts b/src/dashboard-service/types.ts new file mode 100644 index 00000000..02885a36 --- /dev/null +++ b/src/dashboard-service/types.ts @@ -0,0 +1,30 @@ +export interface TopTalker { + pubkey: string + count: number +} + +export interface KPISnapshot { + sequence: number + generatedAt: string + status: 'placeholder' + metrics: { + eventsByKind: Array<{ kind: string, count: number }> + admittedUsers: number | null + satsPaid: number | null + topTalkers: TopTalker[] + } +} + +export interface DashboardSnapshotResponse { + data: KPISnapshot +} + +export interface DashboardWebSocketEnvelope { + type: TType + payload: TPayload +} + +export type DashboardServerMessage = + | DashboardWebSocketEnvelope<'dashboard.connected', { at: string }> + | DashboardWebSocketEnvelope<'kpi.snapshot', KPISnapshot> + | DashboardWebSocketEnvelope<'kpi.tick', { at: string, sequence: number }> From 564f405a349e025ff118d1d05798fd32d3369859 Mon Sep 17 00:00:00 2001 From: Kushagra Date: Fri, 10 Apr 2026 14:43:51 +0530 Subject: [PATCH 04/12] feat(dashboard): Wired dashboard server --- src/dashboard-service/app.ts | 116 ++++++++++++++++++++++++++++++++ src/dashboard-service/config.ts | 28 ++++++++ src/dashboard-service/index.ts | 42 ++++++++++++ 3 files changed, 186 insertions(+) create mode 100644 src/dashboard-service/app.ts create mode 100644 src/dashboard-service/config.ts create mode 100644 src/dashboard-service/index.ts diff --git a/src/dashboard-service/app.ts b/src/dashboard-service/app.ts new file mode 100644 index 00000000..fd19a5cf --- /dev/null +++ b/src/dashboard-service/app.ts @@ -0,0 +1,116 @@ +import { createDashboardRouter } from './api/dashboard-router' +import { createLogger } from '../factories/logger-factory' +import { DashboardServiceConfig } from './config' +import { DashboardWebSocketHub } from './ws/dashboard-ws-hub' +import express from 'express' +import { getHealthRequestHandler } from './handlers/request-handlers/get-health-request-handler' +import http from 'http' +import { PollingScheduler } from './polling/polling-scheduler' +import { SnapshotService } from './services/snapshot-service' +import { WebSocketServer } from 'ws' + +const debug = createLogger('dashboard-service:app') + +export interface DashboardService { + readonly config: DashboardServiceConfig + readonly snapshotService: SnapshotService + readonly pollingScheduler: PollingScheduler + start(): Promise + stop(): Promise + getHttpPort(): number +} + +export const createDashboardService = (config: DashboardServiceConfig): DashboardService => { + console.info( + 'dashboard-service: creating service (host=%s, port=%d, wsPath=%s, pollIntervalMs=%d)', + config.host, + config.port, + config.wsPath, + config.pollIntervalMs, + ) + + const snapshotService = new SnapshotService() + + const app = express() + .disable('x-powered-by') + .get('/healthz', getHealthRequestHandler) + .use('/api/v1/kpis', createDashboardRouter(snapshotService)) + + const webServer = http.createServer(app) + const webSocketServer = new WebSocketServer({ + server: webServer, + path: config.wsPath, + }) + + const webSocketHub = new DashboardWebSocketHub(webSocketServer, () => snapshotService.getSnapshot()) + + const pollingScheduler = new PollingScheduler(config.pollIntervalMs, () => { + const nextSnapshot = snapshotService.refreshPlaceholder() + debug('poll tick produced snapshot sequence=%d', nextSnapshot.sequence) + webSocketHub.broadcastTick(nextSnapshot.sequence) + webSocketHub.broadcastSnapshot(nextSnapshot) + }) + + const start = async () => { + if (webServer.listening) { + debug('start requested but service is already listening') + return + } + + console.info('dashboard-service: starting http and websocket servers') + + await new Promise((resolve, reject) => { + webServer.listen(config.port, config.host, () => { + const address = webServer.address() + debug('listening on %o', address) + console.info('dashboard-service: listening on %o', address) + resolve() + }) + webServer.once('error', (error) => { + console.error('dashboard-service: failed to start server', error) + reject(error) + }) + }) + + pollingScheduler.start() + console.info('dashboard-service: polling scheduler started') + } + + const stop = async () => { + console.info('dashboard-service: stopping service') + pollingScheduler.stop() + webSocketHub.close() + await new Promise((resolve, reject) => { + if (!webServer.listening) { + debug('stop requested while server was already stopped') + resolve() + return + } + + webServer.close((error) => { + if (error) { + console.error('dashboard-service: failed to stop cleanly', error) + reject(error) + return + } + + console.info('dashboard-service: http server closed') + resolve() + }) + }) + } + + const getHttpPort = (): number => { + const address = webServer.address() + return typeof address === 'object' && address !== null ? address.port : config.port + } + + return { + config, + snapshotService, + pollingScheduler, + start, + stop, + getHttpPort, + } +} diff --git a/src/dashboard-service/config.ts b/src/dashboard-service/config.ts new file mode 100644 index 00000000..3d888ae8 --- /dev/null +++ b/src/dashboard-service/config.ts @@ -0,0 +1,28 @@ +export interface DashboardServiceConfig { + host: string + port: number + wsPath: string + pollIntervalMs: number +} + +const parseInteger = (value: string | undefined, fallback: number): number => { + if (typeof value === 'undefined' || value === '') { + return fallback + } + + const parsed = Number(value) + if (!Number.isInteger(parsed) || parsed < 0) { + return fallback + } + + return parsed +} + +export const getDashboardServiceConfig = (): DashboardServiceConfig => { + return { + host: process.env.DASHBOARD_SERVICE_HOST ?? '127.0.0.1', + port: parseInteger(process.env.DASHBOARD_SERVICE_PORT, 8011), + wsPath: process.env.DASHBOARD_WS_PATH ?? '/api/v1/kpis/stream', + pollIntervalMs: parseInteger(process.env.DASHBOARD_POLL_INTERVAL_MS, 5000), + } +} diff --git a/src/dashboard-service/index.ts b/src/dashboard-service/index.ts new file mode 100644 index 00000000..4f64161c --- /dev/null +++ b/src/dashboard-service/index.ts @@ -0,0 +1,42 @@ +import { createLogger } from '../factories/logger-factory' + +import { createDashboardService } from './app' +import { getDashboardServiceConfig } from './config' + +const debug = createLogger('dashboard-service:index') + +const run = async () => { + const config = getDashboardServiceConfig() + console.info('dashboard-service: bootstrapping with config %o', config) + const service = createDashboardService(config) + + const shutdown = async () => { + console.info('dashboard-service: received shutdown signal') + debug('received shutdown signal') + await service.stop() + process.exit(0) + } + + process + .on('SIGINT', shutdown) + .on('SIGTERM', shutdown) + + process.on('uncaughtException', (error) => { + console.error('dashboard-service: uncaught exception', error) + }) + + process.on('unhandledRejection', (error) => { + console.error('dashboard-service: unhandled rejection', error) + }) + + await service.start() +} + +if (require.main === module) { + run().catch((error) => { + console.error('dashboard-service: unable to start', error) + process.exit(1) + }) +} + +export { run } From a8d119bbc6f0e3052c2ca30a2995fd43aa24ebfe Mon Sep 17 00:00:00 2001 From: Kushagra Date: Fri, 10 Apr 2026 14:45:09 +0530 Subject: [PATCH 05/12] feat(dashboard/test): Added tests for dashboard endpoints --- test/unit/dashboard-service/app.spec.ts | 43 +++++++++++++++++++ .../polling-scheduler.spec.ts | 39 +++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 test/unit/dashboard-service/app.spec.ts create mode 100644 test/unit/dashboard-service/polling-scheduler.spec.ts diff --git a/test/unit/dashboard-service/app.spec.ts b/test/unit/dashboard-service/app.spec.ts new file mode 100644 index 00000000..67a092d6 --- /dev/null +++ b/test/unit/dashboard-service/app.spec.ts @@ -0,0 +1,43 @@ +import axios from 'axios' +import { createDashboardService } from '../../../src/dashboard-service/app' +import { expect } from 'chai' +import WebSocket from 'ws' + +describe('dashboard-service app', () => { + it('serves health, snapshot, and websocket endpoints', async () => { + const service = createDashboardService({ + host: '127.0.0.1', + port: 0, + wsPath: '/api/v1/kpis/stream', + pollIntervalMs: 1000, + }) + + await service.start() + + const port = service.getHttpPort() + + const healthResponse = await axios.get(`http://127.0.0.1:${port}/healthz`) + expect(healthResponse.status).to.equal(200) + + const snapshotResponse = await axios.get(`http://127.0.0.1:${port}/api/v1/kpis/snapshot`) + expect(snapshotResponse.status).to.equal(200) + + const snapshotJson = snapshotResponse.data as any + expect(snapshotJson.data).to.have.property('sequence') + + const ws = new WebSocket(`ws://127.0.0.1:${port}/api/v1/kpis/stream`) + + const connectedEvent = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('timeout waiting for ws message')), 2000) + ws.once('message', (raw) => { + clearTimeout(timeout) + resolve(JSON.parse(raw.toString())) + }) + }) + + expect(connectedEvent).to.have.property('type', 'dashboard.connected') + + ws.close() + await service.stop() + }) +}) diff --git a/test/unit/dashboard-service/polling-scheduler.spec.ts b/test/unit/dashboard-service/polling-scheduler.spec.ts new file mode 100644 index 00000000..28d1c929 --- /dev/null +++ b/test/unit/dashboard-service/polling-scheduler.spec.ts @@ -0,0 +1,39 @@ +import { expect } from 'chai' +import Sinon from 'sinon' + +import { PollingScheduler } from '../../../src/dashboard-service/polling/polling-scheduler' + +describe('PollingScheduler', () => { + let clock: Sinon.SinonFakeTimers + + beforeEach(() => { + clock = Sinon.useFakeTimers() + }) + + afterEach(() => { + clock.restore() + }) + + it('runs tick callback on interval while running', async () => { + const tick = Sinon.stub().resolves(undefined) + const scheduler = new PollingScheduler(1000, tick) + + scheduler.start() + await clock.tickAsync(3000) + + expect(tick.callCount).to.equal(3) + scheduler.stop() + }) + + it('stops running when stop is called', async () => { + const tick = Sinon.stub().resolves(undefined) + const scheduler = new PollingScheduler(500, tick) + + scheduler.start() + await clock.tickAsync(1000) + scheduler.stop() + await clock.tickAsync(1000) + + expect(tick.callCount).to.equal(2) + }) +}) From 4b6e89b1e4eec6e3d125b6949766927146e34101 Mon Sep 17 00:00:00 2001 From: Kushagra Date: Fri, 10 Apr 2026 14:46:07 +0530 Subject: [PATCH 06/12] feat: wired test scripts --- package.json | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/package.json b/package.json index 9d9f902f..7039f1b7 100644 --- a/package.json +++ b/package.json @@ -23,10 +23,12 @@ "main": "src/index.ts", "scripts": { "dev": "node -r ts-node/register src/index.ts", + "dev:dashboard": "node -r ts-node/register src/dashboard-service/index.ts", "clean": "rimraf ./{dist,.nyc_output,.test-reports,.coverage}", "build": "tsc --project tsconfig.build.json", "prestart": "npm run build", "start": "cd dist && node src/index.js", + "start:dashboard": "node dist/src/dashboard-service/index.js", "build:check": "npm run build -- --noEmit", "lint": "eslint --ext .ts ./src ./test", "lint:report": "eslint -o .lint-reports/eslint.json -f json --ext .ts ./src ./test", @@ -37,6 +39,8 @@ "db:seed": "knex seed:run", "pretest:unit": "mkdir -p .test-reports/unit", "test:unit": "mocha 'test/**/*.spec.ts'", + "test:unit:dashboard": "mocha 'test/unit/dashboard-service/**/*.spec.ts'", + "test:e2e:dashboard:ws": "node scripts/check_dashboard_ws_updates.js", "test:unit:watch": "npm run test:unit -- --min --watch --watch-files src/**/*,test/**/*", "cover:unit": "nyc --report-dir .coverage/unit npm run test:unit", "docker:build": "docker build -t nostream .", From 28807f441188dba7fb0dc882572d55d7a3783c0c Mon Sep 17 00:00:00 2001 From: Kushagra Date: Sun, 12 Apr 2026 04:22:35 +0530 Subject: [PATCH 07/12] feat(dashboard): Added simple polling based KPI collection service(whole table sanner) --- .../services/kpi-collector-service.ts | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 src/dashboard-service/services/kpi-collector-service.ts diff --git a/src/dashboard-service/services/kpi-collector-service.ts b/src/dashboard-service/services/kpi-collector-service.ts new file mode 100644 index 00000000..51139733 --- /dev/null +++ b/src/dashboard-service/services/kpi-collector-service.ts @@ -0,0 +1,134 @@ +import { DashboardMetrics, EventsByKindCount, TopTalker } from '../types' +import { createLogger } from '../../factories/logger-factory' +import { DatabaseClient } from '../../@types/base' + +const debug = createLogger('dashboard-service:kpi-collector') + +const DEFAULT_TRACKED_KINDS = [7, 1, 6, 1984, 4, 3, 9735] + +const toNumber = (value: unknown): number => { + if (typeof value === 'number') { + return value + } + + if (typeof value === 'string' && value !== '') { + return Number(value) + } + + return 0 +} + +export class KPICollectorService { + public constructor( + private readonly dbClient: DatabaseClient, + private readonly trackedKinds: number[] = DEFAULT_TRACKED_KINDS, + private readonly topTalkersLimit = 10, + private readonly recentDays = 3, + ) { } + + public async collectMetrics(): Promise { + debug('collecting dashboard metrics') + + const [ + eventsByKind, + admittedUsers, + satsPaid, + allTimeTopTalkers, + recentTopTalkers, + ] = await Promise.all([ + this.getEventsByKind(), + this.getAdmittedUsersCount(), + this.getSatsPaidCount(), + this.getTopTalkersAllTime(), + this.getTopTalkersRecent(), + ]) + + return { + eventsByKind, + admittedUsers, + satsPaid, + topTalkers: { + allTime: allTimeTopTalkers, + recent: recentTopTalkers, + }, + } + } + + private async getEventsByKind(): Promise { + const rows = await this.dbClient('events') + .select('event_kind') + .count('id as count') + .whereIn('event_kind', this.trackedKinds) + .groupBy('event_kind') + .orderBy('count', 'desc') as Array<{ event_kind: number, count: string }> + + const other = await this.dbClient('events') + .whereNotIn('event_kind', this.trackedKinds) + .count<{ count: string }>('id as count') + .first() + + const eventsByKind = rows.map((row) => { + return { + kind: String(row.event_kind), + count: toNumber(row.count), + } + }) + + eventsByKind.push({ + kind: 'other', + count: toNumber(other?.count), + }) + + return eventsByKind + } + + private async getAdmittedUsersCount(): Promise { + const result = await this.dbClient('users') + .where('is_admitted', true) + .count<{ count: string }>('pubkey as count') + .first() + + return toNumber(result?.count) + } + + private async getSatsPaidCount(): Promise { + const result = await this.dbClient('users') + .where('is_admitted', true) + .sum<{ total: string | null }>('balance as total') + .first() + + const millisats = toNumber(result?.total) + return millisats / 1000 + } + + private async getTopTalkersAllTime(): Promise { + const rows = await this.dbClient('events') + .select(this.dbClient.raw("encode(event_pubkey, 'hex') as pubkey")) + .count('id as count') + .groupBy('event_pubkey') + .orderBy('count', 'desc') + .limit(this.topTalkersLimit) as unknown as Array<{ pubkey: string | Buffer, count: string | number }> + + return rows.map((row) => ({ + pubkey: String(row.pubkey), + count: toNumber(row.count), + })) + } + + private async getTopTalkersRecent(): Promise { + const since = new Date(Date.now() - this.recentDays * 24 * 60 * 60 * 1000) + + const rows = await this.dbClient('events') + .select(this.dbClient.raw("encode(event_pubkey, 'hex') as pubkey")) + .count('id as count') + .where('first_seen', '>=', since) + .groupBy('event_pubkey') + .orderBy('count', 'desc') + .limit(this.topTalkersLimit) as unknown as Array<{ pubkey: string | Buffer, count: string | number }> + + return rows.map((row) => ({ + pubkey: String(row.pubkey), + count: toNumber(row.count), + })) + } +} From d5e1e01d7777539babb624ba0f02c6f0d613de5f Mon Sep 17 00:00:00 2001 From: Kushagra Date: Sun, 12 Apr 2026 04:23:20 +0530 Subject: [PATCH 08/12] feat(dashboard): Added simple polling based KPI collection service(index based incremental scanning) --- .../incremental-kpi-collector-service.ts | 586 ++++++++++++++++++ 1 file changed, 586 insertions(+) create mode 100644 src/dashboard-service/services/incremental-kpi-collector-service.ts diff --git a/src/dashboard-service/services/incremental-kpi-collector-service.ts b/src/dashboard-service/services/incremental-kpi-collector-service.ts new file mode 100644 index 00000000..e26ce2b8 --- /dev/null +++ b/src/dashboard-service/services/incremental-kpi-collector-service.ts @@ -0,0 +1,586 @@ +import { DashboardMetrics, EventsByKindCount, TopTalker } from '../types' +import { createLogger } from '../../factories/logger-factory' +import { DatabaseClient } from '../../@types/base' + +const debug = createLogger('dashboard-service:incremental-kpi-collector') + +const DEFAULT_TRACKED_KINDS = [7, 1, 6, 1984, 4, 3, 9735] +const MINUTES_PER_DAY = 24 * 60 +const SATS_SCALE_FACTOR = 1000 + + +class MinHeap { + private readonly data: TopTalker[] = [] + + public constructor(private readonly maxSize: number) {} + + public push(item: TopTalker): void { + if (this.data.length < this.maxSize) { + this.data.push(item) + this.bubbleUp(this.data.length - 1) + } else if (this.data.length > 0 && item.count > this.data[0].count) { + this.data[0] = item + this.sinkDown(0) + } + } + + /** Returns the heap contents sorted descending by count. */ + public toSortedDescArray(): TopTalker[] { + return [...this.data].sort((a, b) => b.count - a.count) + } + + public get size(): number { + return this.data.length + } + + private bubbleUp(idx: number): void { + while (idx > 0) { + const parent = (idx - 1) >> 1 + if (this.data[parent].count <= this.data[idx].count) { + break + } + [this.data[parent], this.data[idx]] = [this.data[idx], this.data[parent]] + idx = parent + } + } + + private sinkDown(idx: number): void { + const n = this.data.length + // eslint-disable-next-line no-constant-condition + while(true) { + let smallest = idx + const left = 2 * idx + 1 + const right = 2 * idx + 2 + + if (left < n && this.data[left].count < this.data[smallest].count) { + smallest = left + } + if (right < n && this.data[right].count < this.data[smallest].count) { + smallest = right + } + if (smallest === idx) { + break + } + [this.data[smallest], this.data[idx]] = [this.data[idx], this.data[smallest]] + idx = smallest + } + } +} + + +interface ICombinedEventRow { + agg_type: 'kind' | 'talker' | 'bucket' + event_kind: number | null + pubkey: string | null + bucket_epoch: string | number | null + count: string | number +} + +interface IEventCursorRow { + first_seen: string + id: string +} + +interface IUserSnapshotRow { + pubkey: string | Buffer + is_admitted: boolean | number | string + balance: string | number | null + updated_at_epoch: string | number +} + +interface IUserCursorRow { + updated_at_epoch: string | number + pubkey: string | Buffer +} + + +const toNumber = (value: unknown): number => { + if (typeof value === 'number') { + return value + } + if (typeof value === 'string' && value !== '') { + return Number(value) + } + return 0 +} + +const toBoolean = (value: unknown): boolean => { + if (typeof value === 'boolean') { + return value + } + if (typeof value === 'number') { + return value === 1 + } + if (typeof value === 'string') { + return value === '1' || value.toLowerCase() === 'true' || value.toLowerCase() === 't' + } + return false +} + +const normalizeText = (value: unknown): string => { + if (Buffer.isBuffer(value)) { + return value.toString('hex') + } + return String(value) +} + + +interface IEventCursor { + firstSeen: string + id: string +} + +interface IUserCursor { + updatedAtEpoch: number + pubkey: string +} + +interface IUserState { + isAdmitted: boolean + balanceMillisats: number +} + +export class IncrementalKPICollectorService { + private readonly trackedKindsSet: Set + + // All-time talker counts stored in a Map; the MinHeap is rebuilt on each + // collectMetrics() call from this map — keeping O(N) space in the map while + // heap work is O(N log K) per cycle instead of O(N log N). + private readonly allTimeTalkerCounts = new Map() + + private readonly eventsByKindCounts = new Map() + + // Recent bucket data: minuteEpoch → pubkey → count (pruned each cycle) + private readonly recentBucketTalkerCounts = new Map>() + + private readonly userStates = new Map() + + private admittedUsers = 0 + + private eventCursor: IEventCursor = { + firstSeen: '1970-01-01 00:00:00.000000', + id: '00000000-0000-0000-0000-000000000000', + } + + private initialized = false + + private satsPaidMillisats = 0 + + private userCursor: IUserCursor = { + updatedAtEpoch: 0, + pubkey: '', + } + + public constructor( + private readonly dbClient: DatabaseClient, + private readonly trackedKinds: number[] = DEFAULT_TRACKED_KINDS, + private readonly topTalkersLimit = 10, + private readonly recentDays = 3, + ) { + this.trackedKindsSet = new Set(trackedKinds) + } + + public async collectMetrics(): Promise { + if (!this.initialized) { + await this.bootstrapState() + this.initialized = true + } else { + await Promise.all([ + this.applyEventDeltas(), + this.applyUserDeltas(), + ]) + } + + this.pruneRecentBuckets() + + return { + eventsByKind: this.buildEventsByKindMetrics(), + admittedUsers: this.admittedUsers, + satsPaid: this.satsPaidMillisats / SATS_SCALE_FACTOR, + topTalkers: { + allTime: this.getTopKFromMap(this.allTimeTalkerCounts), + recent: this.getRecentTopTalkers(), + }, + } + } + + /** + * Fetches all new events since the last cursor in a single MATERIALIZED CTE + * query, then fans the rows out into three accumulators: + * • kind counts + * • all-time talker counts + * • per-minute bucket talker counts (for the recent window) + * + * Using MATERIALIZED forces a single table scan; without it PG 15 may inline + * the CTE and re-scan for each UNION ALL branch. + */ + + private async applyEventDeltas(): Promise { + const sinceMinuteEpoch = this.getWindowStartMinute() + + // Combined query: one MATERIALIZED CTE, three aggregation branches. + const combinedSql = ` + WITH new_events AS MATERIALIZED ( + SELECT event_kind, event_pubkey, first_seen + FROM events + WHERE (first_seen, id) > (?, ?) + ) + SELECT 'kind' AS agg_type, + event_kind, + NULL::text AS pubkey, + NULL::bigint AS bucket_epoch, + COUNT(*)::bigint AS count + FROM new_events + GROUP BY event_kind + + UNION ALL + + SELECT 'talker', + NULL, + encode(event_pubkey, 'hex'), + NULL, + COUNT(*)::bigint + FROM new_events + GROUP BY event_pubkey + + UNION ALL + + SELECT 'bucket', + NULL, + encode(event_pubkey, 'hex'), + extract(epoch FROM date_trunc('minute', first_seen))::bigint, + COUNT(*)::bigint + FROM new_events + WHERE first_seen >= to_timestamp(?) + GROUP BY event_pubkey, date_trunc('minute', first_seen); + ` + + // Cursor update: a separate lightweight query on the indexed (first_seen, id) pair. + const cursorSql = ` + SELECT to_char(first_seen, 'YYYY-MM-DD HH24:MI:SS.US') AS first_seen, id + FROM events + WHERE (first_seen, id) > (?, ?) + ORDER BY first_seen DESC, id DESC + LIMIT 1; + ` + + const [combinedRows, cursorRows] = await Promise.all([ + this.queryRows(combinedSql, [ + this.eventCursor.firstSeen, + this.eventCursor.id, + sinceMinuteEpoch * 60, + ]), + this.queryRows(cursorSql, [ + this.eventCursor.firstSeen, + this.eventCursor.id, + ]), + ]) + + for (const row of combinedRows) { + const count = toNumber(row.count) + + if (row.agg_type === 'kind') { + const kind = this.trackedKindsSet.has(toNumber(row.event_kind)) + ? String(row.event_kind) + : 'other' + this.eventsByKindCounts.set(kind, (this.eventsByKindCounts.get(kind) ?? 0) + count) + } else if (row.agg_type === 'talker') { + const pubkey = row.pubkey ?? '' + this.allTimeTalkerCounts.set(pubkey, (this.allTimeTalkerCounts.get(pubkey) ?? 0) + count) + } else if (row.agg_type === 'bucket') { + const minuteEpoch = toNumber(row.bucket_epoch) + const pubkey = row.pubkey ?? '' + const bucket = this.recentBucketTalkerCounts.get(minuteEpoch) ?? new Map() + bucket.set(pubkey, (bucket.get(pubkey) ?? 0) + count) + this.recentBucketTalkerCounts.set(minuteEpoch, bucket) + } + } + + if (cursorRows.length > 0) { + this.eventCursor = { + firstSeen: cursorRows[0].first_seen, + id: cursorRows[0].id, + } + } + } + + private async applyUserDeltas(): Promise { + const [changedUsers, latestUserCursor] = await Promise.all([ + this.queryRows( + ` + SELECT + encode(pubkey, 'hex') AS pubkey, + is_admitted, + balance, + extract(epoch FROM updated_at)::bigint AS updated_at_epoch + FROM users + WHERE (extract(epoch FROM updated_at)::bigint, encode(pubkey, 'hex')) > (?, ?) + ORDER BY updated_at ASC, pubkey ASC; + `, + [this.userCursor.updatedAtEpoch, this.userCursor.pubkey], + ), + this.queryRows( + ` + SELECT + extract(epoch FROM updated_at)::bigint AS updated_at_epoch, + encode(pubkey, 'hex') AS pubkey + FROM users + WHERE (extract(epoch FROM updated_at)::bigint, encode(pubkey, 'hex')) > (?, ?) + ORDER BY updated_at DESC, pubkey DESC + LIMIT 1; + `, + [this.userCursor.updatedAtEpoch, this.userCursor.pubkey], + ), + ]) + + for (const row of changedUsers) { + const pubkey = normalizeText(row.pubkey) + const nextState: IUserState = { + isAdmitted: toBoolean(row.is_admitted), + balanceMillisats: toNumber(row.balance), + } + + const previousState = this.userStates.get(pubkey) + if (previousState?.isAdmitted) { + this.admittedUsers -= 1 + this.satsPaidMillisats -= previousState.balanceMillisats + } + + if (nextState.isAdmitted) { + this.admittedUsers += 1 + this.satsPaidMillisats += nextState.balanceMillisats + } + + this.userStates.set(pubkey, nextState) + } + + if (latestUserCursor.length > 0) { + this.userCursor = { + updatedAtEpoch: toNumber(latestUserCursor[0].updated_at_epoch), + pubkey: normalizeText(latestUserCursor[0].pubkey), + } + } + } + + /** + * The bootstrap runs only for the first time and + * Same MATERIALIZED CTE strategy as applyEventDeltas but without the cursor + * predicate (reads entire table on first run). + */ + private async bootstrapState(): Promise { + debug('bootstrapping incremental KPI collector state') + + this.resetState() + + const sinceMinuteEpoch = this.getWindowStartMinute() + + const bootstrapEventsSql = ` + WITH all_events AS MATERIALIZED ( + SELECT event_kind, event_pubkey, first_seen + FROM events + ) + SELECT 'kind' AS agg_type, + event_kind, + NULL::text AS pubkey, + NULL::bigint AS bucket_epoch, + COUNT(*)::bigint AS count + FROM all_events + GROUP BY event_kind + + UNION ALL + + SELECT 'talker', + NULL, + encode(event_pubkey, 'hex'), + NULL, + COUNT(*)::bigint + FROM all_events + GROUP BY event_pubkey + + UNION ALL + + SELECT 'bucket', + NULL, + encode(event_pubkey, 'hex'), + extract(epoch FROM date_trunc('minute', first_seen))::bigint, + COUNT(*)::bigint + FROM all_events + WHERE first_seen >= to_timestamp(?) + GROUP BY event_pubkey, date_trunc('minute', first_seen); + ` + + const [combinedRows, eventCursorRows, userRows, userCursorRows] = await Promise.all([ + this.queryRows(bootstrapEventsSql, [sinceMinuteEpoch * 60]), + this.queryRows( + ` + SELECT to_char(first_seen, 'YYYY-MM-DD HH24:MI:SS.US') AS first_seen, id + FROM events + ORDER BY first_seen DESC, id DESC + LIMIT 1; + `, + [], + ), + // Only load admitted users at bootstrap — avoids unbounded memory growth + // from loading every user row into the in-memory map. + this.queryRows( + ` + SELECT + encode(pubkey, 'hex') AS pubkey, + is_admitted, + balance, + extract(epoch FROM updated_at)::bigint AS updated_at_epoch + FROM users + WHERE is_admitted = true; + `, + [], + ), + this.queryRows( + ` + SELECT + extract(epoch FROM updated_at)::bigint AS updated_at_epoch, + encode(pubkey, 'hex') AS pubkey + FROM users + ORDER BY updated_at DESC, pubkey DESC + LIMIT 1; + `, + [], + ), + ]) + + for (const row of combinedRows) { + const count = toNumber(row.count) + + if (row.agg_type === 'kind') { + const kind = this.trackedKindsSet.has(toNumber(row.event_kind)) + ? String(row.event_kind) + : 'other' + this.eventsByKindCounts.set(kind, (this.eventsByKindCounts.get(kind) ?? 0) + count) + } else if (row.agg_type === 'talker') { + const pubkey = row.pubkey ?? '' + this.allTimeTalkerCounts.set(pubkey, (this.allTimeTalkerCounts.get(pubkey) ?? 0) + count) + } else if (row.agg_type === 'bucket') { + const minuteEpoch = toNumber(row.bucket_epoch) + const pubkey = row.pubkey ?? '' + const bucket = this.recentBucketTalkerCounts.get(minuteEpoch) ?? new Map() + bucket.set(pubkey, (bucket.get(pubkey) ?? 0) + count) + this.recentBucketTalkerCounts.set(minuteEpoch, bucket) + } + } + + // Bootstrap only admitted users (memory-safe for large relays). + for (const row of userRows) { + const pubkey = normalizeText(row.pubkey) + const userState: IUserState = { + isAdmitted: true, // filtered in query + balanceMillisats: toNumber(row.balance), + } + this.userStates.set(pubkey, userState) + this.admittedUsers += 1 + this.satsPaidMillisats += userState.balanceMillisats + } + + if (eventCursorRows.length > 0) { + this.eventCursor = { + firstSeen: eventCursorRows[0].first_seen, + id: eventCursorRows[0].id, + } + } + + if (userCursorRows.length > 0) { + this.userCursor = { + updatedAtEpoch: toNumber(userCursorRows[0].updated_at_epoch), + pubkey: normalizeText(userCursorRows[0].pubkey), + } + } + } + + private buildEventsByKindMetrics(): EventsByKindCount[] { + const eventsByKind = Array + .from(this.eventsByKindCounts.entries()) + .filter(([kind]) => kind !== 'other') + .map(([kind, count]) => ({ kind, count })) + .sort((a, b) => b.count - a.count) + + eventsByKind.push({ + kind: 'other', + count: this.eventsByKindCounts.get('other') ?? 0, + }) + + return eventsByKind + } + + /** + * Builds Top-K talkers using a MinHeap (O(N log K)) instead of a full sort + * (O(N log N)). For large relays with millions of distinct pubkeys this is a + * significant speedup and the heap is bounded to K entries. + */ + private getTopKFromMap(counts: Map): TopTalker[] { + const heap = new MinHeap(this.topTalkersLimit) + + for (const [pubkey, count] of counts) { + heap.push({ pubkey, count }) + } + + return heap.toSortedDescArray() + } + + private getRecentTopTalkers(): TopTalker[] { + // Merge all per-minute buckets into a single counts map, then run Top-K. + const merged = new Map() + + for (const bucketCounts of this.recentBucketTalkerCounts.values()) { + for (const [pubkey, count] of bucketCounts) { + merged.set(pubkey, (merged.get(pubkey) ?? 0) + count) + } + } + + return this.getTopKFromMap(merged) + } + + private getWindowStartMinute(): number { + const windowMinutes = this.recentDays * MINUTES_PER_DAY + const nowMinute = Math.floor(Date.now() / 60000) + return nowMinute - windowMinutes + } + + private pruneRecentBuckets(): void { + const thresholdMinute = this.getWindowStartMinute() + + for (const bucketMinute of this.recentBucketTalkerCounts.keys()) { + if (bucketMinute < thresholdMinute) { + this.recentBucketTalkerCounts.delete(bucketMinute) + } + } + } + + private async queryRows(sql: string, bindings: unknown[]): Promise { + const rawResult = await this.dbClient.raw(sql, bindings) + + if (Array.isArray((rawResult as any).rows)) { + return (rawResult as any).rows as T[] + } + + if (Array.isArray(rawResult)) { + return rawResult as unknown as T[] + } + + return [] + } + + private resetState(): void { + this.allTimeTalkerCounts.clear() + this.eventsByKindCounts.clear() + this.recentBucketTalkerCounts.clear() + this.userStates.clear() + this.admittedUsers = 0 + this.satsPaidMillisats = 0 + this.eventCursor = { + firstSeen: '1970-01-01 00:00:00.000000', + id: '00000000-0000-0000-0000-000000000000', + } + this.userCursor = { + updatedAtEpoch: 0, + pubkey: '', + } + } +} From 583773d571cb20fd6467bd4ef58d4637a79aa2aa Mon Sep 17 00:00:00 2001 From: Kushagra Date: Sun, 12 Apr 2026 04:27:27 +0530 Subject: [PATCH 09/12] feat(dashboard): Added PSQL updation based conditional polling --- .../services/stateful-incremental-service.ts | 239 ++++++++++++++++++ 1 file changed, 239 insertions(+) create mode 100644 src/dashboard-service/services/stateful-incremental-service.ts diff --git a/src/dashboard-service/services/stateful-incremental-service.ts b/src/dashboard-service/services/stateful-incremental-service.ts new file mode 100644 index 00000000..2638dace --- /dev/null +++ b/src/dashboard-service/services/stateful-incremental-service.ts @@ -0,0 +1,239 @@ +import { Client, ClientConfig } from 'pg' +import { createLogger } from '../../factories/logger-factory' +import { DashboardMetrics } from '../types' +import { DatabaseClient } from '../../@types/base' +import { IncrementalKPICollectorService } from './incremental-kpi-collector-service' + +const debug = createLogger('dashboard-service:stateful-incremental-kpi-collector') + +const DEFAULT_EVENTS_CHANNEL = 'dashboard_events_changed' +const DEFAULT_USERS_CHANNEL = 'dashboard_users_changed' + +const isValidChannelName = (channel: string): boolean => { + return /^[a-zA-Z_][a-zA-Z0-9_]*$/.test(channel) +} + +const getListenerConnectionConfig = (): ClientConfig => { + if (process.env.DB_URI) { + return { + connectionString: process.env.DB_URI, + } + } + + return { + host: process.env.DB_HOST, + port: Number(process.env.DB_PORT), + user: process.env.DB_USER, + password: process.env.DB_PASSWORD, + database: process.env.DB_NAME, + } +} + +const defaultMetrics = (): DashboardMetrics => { + return { + eventsByKind: [], + admittedUsers: 0, + satsPaid: 0, + topTalkers: { + allTime: [], + recent: [], + }, + } +} + +export class StatefulIncrementalKPICollectorService { + private cachedMetrics: DashboardMetrics = defaultMetrics() + + private hasCache = false + + private isDirty = true + + private isListenerReady = false + + /** Set to true permanently once close() is called — prevents reconnect loops after shutdown. */ + private isClosed = false + + private listenerClient: Client | undefined + + private reconnectTimer: ReturnType | undefined + + private readonly incrementalCollector: IncrementalKPICollectorService + + private readonly channels: string[] + + private static readonly BASE_DELAY_MS = 500 + private static readonly MAX_DELAY_MS = 30_000 + + /** Backoff state — reset to BASE_DELAY_MS on every successful connect. */ + private reconnectDelayMs = StatefulIncrementalKPICollectorService.BASE_DELAY_MS + + public constructor( + dbClient: DatabaseClient, + trackedKinds?: number[], + topTalkersLimit?: number, + recentDays?: number, + ) { + this.incrementalCollector = new IncrementalKPICollectorService( + dbClient, + trackedKinds, + topTalkersLimit, + recentDays, + ) + + this.channels = [ + process.env.DASHBOARD_EVENTS_NOTIFY_CHANNEL ?? DEFAULT_EVENTS_CHANNEL, + process.env.DASHBOARD_USERS_NOTIFY_CHANNEL ?? DEFAULT_USERS_CHANNEL, + ] + } + + public async collectMetrics(): Promise { + // Kick off a connect attempt if the listener isn't alive yet. + // We don't await here — the listener is best-effort; data comes from the + // incremental collector regardless. + if (!this.isListenerReady && !this.listenerClient) { + this.scheduleReconnect(0) + } + + if (!this.hasCache || this.isDirty) { + this.cachedMetrics = await this.incrementalCollector.collectMetrics() + this.hasCache = true + this.isDirty = false + } + + return this.cachedMetrics + } + + public async close(): Promise { + this.isClosed = true + + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer) + this.reconnectTimer = undefined + } + + const client = this.listenerClient + this.listenerClient = undefined + this.isListenerReady = false + + if (!client) { + return + } + + for (const channel of this.channels) { + if (!isValidChannelName(channel)) { + continue + } + + try { + await client.query(`UNLISTEN ${channel}`) + } catch (error) { + console.error('dashboard-service: failed to unlisten channel', { + channel, + error, + }) + } + } + + client.removeAllListeners('notification') + client.removeAllListeners('error') + client.removeAllListeners('end') + + try { + await client.end() + } catch (error) { + console.error('dashboard-service: failed to close stateful incremental collector listener', error) + } + } + + + /** + * Schedule a reconnect attempt after `delayMs` milliseconds. + * Passing 0 connects immediately (used on first call and after close via `close()`). + */ + private scheduleReconnect(delayMs: number): void { + if (this.isClosed || this.reconnectTimer || this.isListenerReady) { + return + } + + debug('scheduling listener reconnect in %d ms', delayMs) + + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = undefined + this.connectListener().catch((err) => { + // connectListener already logs; just ensure the loop continues. + debug('connectListener threw unexpectedly: %o', err) + }) + }, delayMs) + } + + private async connectListener(): Promise { + if (this.isClosed || this.isListenerReady) { + return + } + + const client = new Client(getListenerConnectionConfig()) + + client.on('notification', (notification) => { + if (!notification.channel || !this.channels.includes(notification.channel)) { + return + } + + this.isDirty = true + debug('received postgres notification on channel=%s', notification.channel) + }) + + client.on('error', (error) => { + this.isDirty = true + this.isListenerReady = false + console.error('dashboard-service: stateful incremental collector listener error', error) + // Don't call scheduleReconnect here — 'end' will always fire after 'error' + // on a pg.Client, so we reconnect from the 'end' handler to avoid double-scheduling. + }) + + client.on('end', () => { + this.isDirty = true + this.isListenerReady = false + this.listenerClient = undefined + debug('postgres stateful incremental collector listener ended — will reconnect in %d ms', this.reconnectDelayMs) + + if (!this.isClosed) { + this.scheduleReconnect(this.reconnectDelayMs) + // Exponential backoff, capped at MAX_DELAY_MS. + this.reconnectDelayMs = Math.min( + this.reconnectDelayMs * 2, + StatefulIncrementalKPICollectorService.MAX_DELAY_MS, + ) + } + }) + + try { + await client.connect() + + for (const channel of this.channels) { + if (!isValidChannelName(channel)) { + console.error('dashboard-service: skipping invalid notify channel name', channel) + continue + } + + await client.query(`LISTEN ${channel}`) + } + + this.listenerClient = client + this.isListenerReady = true + // Reset backoff on successful connect. + this.reconnectDelayMs = StatefulIncrementalKPICollectorService.BASE_DELAY_MS + debug('postgres stateful incremental collector listener initialized for channels=%o', this.channels) + } catch (error) { + this.isDirty = true + this.listenerClient = undefined + this.isListenerReady = false + console.error('dashboard-service: unable to initialize stateful incremental collector listener', error) + + try { + await client.end() + } catch (_closeError) { + // best effort — 'end' handler above will fire and schedule the next reconnect + } + } + } +} From 65f23a0faad9ed536731f5648048f1da4b91c1f1 Mon Sep 17 00:00:00 2001 From: Kushagra Date: Sun, 12 Apr 2026 04:28:25 +0530 Subject: [PATCH 10/12] feat(dashboard): Wired up new collectors --- src/dashboard-service/app.ts | 67 ++++++++++++++--- src/dashboard-service/config.ts | 30 ++++++++ .../services/snapshot-service.ts | 71 +++++++++++++++---- src/dashboard-service/types.ts | 24 +++++-- 4 files changed, 164 insertions(+), 28 deletions(-) diff --git a/src/dashboard-service/app.ts b/src/dashboard-service/app.ts index fd19a5cf..e4970567 100644 --- a/src/dashboard-service/app.ts +++ b/src/dashboard-service/app.ts @@ -1,3 +1,5 @@ +import { getMasterDbClient, getReadReplicaDbClient } from '../database/client' +import { IKPICollector, SnapshotService } from './services/snapshot-service' import { createDashboardRouter } from './api/dashboard-router' import { createLogger } from '../factories/logger-factory' import { DashboardServiceConfig } from './config' @@ -5,10 +7,11 @@ import { DashboardWebSocketHub } from './ws/dashboard-ws-hub' import express from 'express' import { getHealthRequestHandler } from './handlers/request-handlers/get-health-request-handler' import http from 'http' +import { IncrementalKPICollectorService } from './services/incremental-kpi-collector-service' +import { KPICollectorService } from './services/kpi-collector-service' import { PollingScheduler } from './polling/polling-scheduler' -import { SnapshotService } from './services/snapshot-service' +import { StatefulIncrementalKPICollectorService } from './services/stateful-incremental-service' import { WebSocketServer } from 'ws' - const debug = createLogger('dashboard-service:app') export interface DashboardService { @@ -22,14 +25,36 @@ export interface DashboardService { export const createDashboardService = (config: DashboardServiceConfig): DashboardService => { console.info( - 'dashboard-service: creating service (host=%s, port=%d, wsPath=%s, pollIntervalMs=%d)', + 'dashboard-service: creating service (host=%s, port=%d, wsPath=%s, pollIntervalMs=%d, useDummyData=%s, collectorMode=%s)', config.host, config.port, config.wsPath, config.pollIntervalMs, + config.useDummyData, + config.collectorMode, ) - const snapshotService = new SnapshotService() + const collector: IKPICollector = config.useDummyData + ? { + collectMetrics: async () => ({ + eventsByKind: [], + admittedUsers: 0, + satsPaid: 0, + topTalkers: { allTime: [], recent: [] }, + }), + } + : (() => { + if (config.collectorMode === 'stateful-incremental') { + return new StatefulIncrementalKPICollectorService(getMasterDbClient()) + } + + const dbClient = getReadReplicaDbClient() + return config.collectorMode === 'incremental' + ? new IncrementalKPICollectorService(dbClient) + : new KPICollectorService(dbClient) + })() + + const snapshotService = new SnapshotService(collector) const app = express() .disable('x-powered-by') @@ -44,11 +69,17 @@ export const createDashboardService = (config: DashboardServiceConfig): Dashboar const webSocketHub = new DashboardWebSocketHub(webSocketServer, () => snapshotService.getSnapshot()) - const pollingScheduler = new PollingScheduler(config.pollIntervalMs, () => { - const nextSnapshot = snapshotService.refreshPlaceholder() - debug('poll tick produced snapshot sequence=%d', nextSnapshot.sequence) - webSocketHub.broadcastTick(nextSnapshot.sequence) - webSocketHub.broadcastSnapshot(nextSnapshot) + const pollingScheduler = new PollingScheduler(config.pollIntervalMs, async () => { + const { snapshot, changed } = await snapshotService.refresh() + + if (!changed) { + debug('poll tick detected no KPI changes') + return + } + + debug('poll tick produced snapshot sequence=%d status=%s', snapshot.sequence, snapshot.status) + webSocketHub.broadcastTick(snapshot.sequence) + webSocketHub.broadcastSnapshot(snapshot) }) const start = async () => { @@ -72,6 +103,15 @@ export const createDashboardService = (config: DashboardServiceConfig): Dashboar }) }) + try { + const initialSnapshotRefresh = await snapshotService.refresh() + if (initialSnapshotRefresh.changed) { + debug('initial snapshot prepared with sequence=%d status=%s', initialSnapshotRefresh.snapshot.sequence, initialSnapshotRefresh.snapshot.status) + } + } catch (error) { + console.error('dashboard-service: initial snapshot refresh failed (will retry on next poll)', error) + } + pollingScheduler.start() console.info('dashboard-service: polling scheduler started') } @@ -79,6 +119,15 @@ export const createDashboardService = (config: DashboardServiceConfig): Dashboar const stop = async () => { console.info('dashboard-service: stopping service') pollingScheduler.stop() + + if (collector?.close) { + try { + await collector.close() + } catch (error) { + console.error('dashboard-service: failed to close collector resources', error) + } + } + webSocketHub.close() await new Promise((resolve, reject) => { if (!webServer.listening) { diff --git a/src/dashboard-service/config.ts b/src/dashboard-service/config.ts index 3d888ae8..21872984 100644 --- a/src/dashboard-service/config.ts +++ b/src/dashboard-service/config.ts @@ -3,6 +3,18 @@ export interface DashboardServiceConfig { port: number wsPath: string pollIntervalMs: number + useDummyData: boolean + collectorMode: DashboardCollectorMode +} + +export type DashboardCollectorMode = 'full' | 'incremental' | 'stateful-incremental' + +const parseBoolean = (value: string | undefined, fallback = false): boolean => { + if (typeof value === 'undefined') { + return fallback + } + + return value === '1' || value.toLowerCase() === 'true' } const parseInteger = (value: string | undefined, fallback: number): number => { @@ -18,11 +30,29 @@ const parseInteger = (value: string | undefined, fallback: number): number => { return parsed } +const parseCollectorMode = ( + value: string | undefined, + fallback: DashboardCollectorMode = 'full', +): DashboardCollectorMode => { + if (typeof value === 'undefined') { + return fallback + } + + const normalized = value.toLowerCase() + if (normalized === 'full' || normalized === 'incremental' || normalized === 'stateful-incremental') { + return normalized + } + + return fallback +} + export const getDashboardServiceConfig = (): DashboardServiceConfig => { return { host: process.env.DASHBOARD_SERVICE_HOST ?? '127.0.0.1', port: parseInteger(process.env.DASHBOARD_SERVICE_PORT, 8011), wsPath: process.env.DASHBOARD_WS_PATH ?? '/api/v1/kpis/stream', pollIntervalMs: parseInteger(process.env.DASHBOARD_POLL_INTERVAL_MS, 5000), + useDummyData: parseBoolean(process.env.DASHBOARD_USE_DUMMY_DATA, false), + collectorMode: parseCollectorMode(process.env.DASHBOARD_COLLECTOR_MODE, 'full'), } } diff --git a/src/dashboard-service/services/snapshot-service.ts b/src/dashboard-service/services/snapshot-service.ts index b1a511fe..d846269f 100644 --- a/src/dashboard-service/services/snapshot-service.ts +++ b/src/dashboard-service/services/snapshot-service.ts @@ -1,34 +1,81 @@ -import { KPISnapshot } from '../types' +import { DashboardMetrics, KPISnapshot } from '../types' +import { createLogger } from '../../factories/logger-factory' + +const debug = createLogger('dashboard-service:snapshot-service') + +const defaultMetrics = (): DashboardMetrics => ({ + eventsByKind: [], + admittedUsers: 0, + satsPaid: 0, + topTalkers: { + allTime: [], + recent: [], + }, +}) + +export interface ISnapshotRefreshResult { + snapshot: KPISnapshot + changed: boolean +} + +export interface IKPICollector { + collectMetrics(): Promise + close?(): Promise | void +} export class SnapshotService { + private metricsFingerprint = JSON.stringify(defaultMetrics()) + private sequence = 0 private snapshot: KPISnapshot = { sequence: this.sequence, generatedAt: new Date(0).toISOString(), - status: 'placeholder', - metrics: { - eventsByKind: [], - admittedUsers: null, - satsPaid: null, - topTalkers: [], - }, + status: 'live', + metrics: defaultMetrics(), } + public constructor(private readonly collector: IKPICollector) { } + public getSnapshot(): KPISnapshot { return this.snapshot } - // Phase 1 placeholder: advances sequence/time so polling and websocket flow can be validated end-to-end. - public refreshPlaceholder(): KPISnapshot { + /** + * Fetches fresh metrics from the collector and updates the snapshot if the + * metrics have changed. Throws if the collector is unavailable — callers + * are responsible for catching and deciding how to surface errors. + */ + public async refresh(): Promise { + const metrics = await this.collector.collectMetrics() + const nextFingerprint = JSON.stringify(metrics) + + if (nextFingerprint === this.metricsFingerprint && this.snapshot.status === 'live') { + debug('metrics unchanged, skipping snapshot sequence update') + return { + snapshot: this.snapshot, + changed: false, + } + } + + this.metricsFingerprint = nextFingerprint + + return this.updateSnapshot(metrics, 'live') + } + + private updateSnapshot(metrics: DashboardMetrics, status: 'live' | 'stale'): ISnapshotRefreshResult { this.sequence += 1 this.snapshot = { - ...this.snapshot, sequence: this.sequence, generatedAt: new Date().toISOString(), + status, + metrics, } - return this.snapshot + return { + snapshot: this.snapshot, + changed: true, + } } } diff --git a/src/dashboard-service/types.ts b/src/dashboard-service/types.ts index 02885a36..15165b30 100644 --- a/src/dashboard-service/types.ts +++ b/src/dashboard-service/types.ts @@ -3,16 +3,26 @@ export interface TopTalker { count: number } +export interface EventsByKindCount { + kind: string + count: number +} + +export interface DashboardMetrics { + eventsByKind: EventsByKindCount[] + admittedUsers: number + satsPaid: number + topTalkers: { + allTime: TopTalker[] + recent: TopTalker[] + } +} + export interface KPISnapshot { sequence: number generatedAt: string - status: 'placeholder' - metrics: { - eventsByKind: Array<{ kind: string, count: number }> - admittedUsers: number | null - satsPaid: number | null - topTalkers: TopTalker[] - } + status: 'live' | 'stale' + metrics: DashboardMetrics } export interface DashboardSnapshotResponse { From 02e6a2b694035023442a265ab4797ae8ed82af68 Mon Sep 17 00:00:00 2001 From: Kushagra Date: Sun, 12 Apr 2026 04:30:30 +0530 Subject: [PATCH 11/12] feat(dashboard): Added New test cases for collector services and updated polling services --- package.json | 1 - .../polling/polling-scheduler.ts | 52 ++++++--- test/unit/dashboard-service/app.spec.ts | 2 + .../polling-scheduler.spec.ts | 62 +++++++++- .../snapshot-service.spec.ts | 107 ++++++++++++++++++ 5 files changed, 208 insertions(+), 16 deletions(-) create mode 100644 test/unit/dashboard-service/snapshot-service.spec.ts diff --git a/package.json b/package.json index 7039f1b7..e78a3b12 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,6 @@ "pretest:unit": "mkdir -p .test-reports/unit", "test:unit": "mocha 'test/**/*.spec.ts'", "test:unit:dashboard": "mocha 'test/unit/dashboard-service/**/*.spec.ts'", - "test:e2e:dashboard:ws": "node scripts/check_dashboard_ws_updates.js", "test:unit:watch": "npm run test:unit -- --min --watch --watch-files src/**/*,test/**/*", "cover:unit": "nyc --report-dir .coverage/unit npm run test:unit", "docker:build": "docker build -t nostream .", diff --git a/src/dashboard-service/polling/polling-scheduler.ts b/src/dashboard-service/polling/polling-scheduler.ts index c24898d9..58663adb 100644 --- a/src/dashboard-service/polling/polling-scheduler.ts +++ b/src/dashboard-service/polling/polling-scheduler.ts @@ -4,8 +4,15 @@ type Tick = () => Promise | void const debug = createLogger('dashboard-service:polling') +/** + * Runs a tick callback on a fixed cadence, but — unlike setInterval — never + * overlaps: the next tick is only scheduled *after* the current one resolves + * or rejects. This prevents DB query storms when a poll takes longer than the + * configured interval. + */ export class PollingScheduler { - private timer: NodeJS.Timer | undefined + private timer: NodeJS.Timeout | undefined + private running = false public constructor( private readonly intervalMs: number, @@ -13,31 +20,50 @@ export class PollingScheduler { ) { } public start(): void { - if (this.timer) { + if (this.running) { return } + this.running = true debug('starting scheduler with interval %d ms', this.intervalMs) - - this.timer = setInterval(() => { - Promise.resolve(this.tick()) - .catch((error) => { - console.error('dashboard-service: polling tick failed', error) - }) - }, this.intervalMs) + this.scheduleNext() } public stop(): void { - if (!this.timer) { + if (!this.running) { return } debug('stopping scheduler') - clearInterval(this.timer) - this.timer = undefined + this.running = false + + if (this.timer) { + clearTimeout(this.timer) + this.timer = undefined + } } public isRunning(): boolean { - return Boolean(this.timer) + return this.running + } + + private scheduleNext(): void { + if (!this.running) { + return + } + + this.timer = setTimeout(() => { + this.timer = undefined + + Promise.resolve(this.tick()) + .catch((error) => { + console.error('dashboard-service: polling tick failed', error) + }) + .finally(() => { + // Schedule the next tick only after the current one completes, + // regardless of success or failure. + this.scheduleNext() + }) + }, this.intervalMs) } } diff --git a/test/unit/dashboard-service/app.spec.ts b/test/unit/dashboard-service/app.spec.ts index 67a092d6..f8e81331 100644 --- a/test/unit/dashboard-service/app.spec.ts +++ b/test/unit/dashboard-service/app.spec.ts @@ -10,6 +10,8 @@ describe('dashboard-service app', () => { port: 0, wsPath: '/api/v1/kpis/stream', pollIntervalMs: 1000, + useDummyData: true, + collectorMode: 'full', }) await service.start() diff --git a/test/unit/dashboard-service/polling-scheduler.spec.ts b/test/unit/dashboard-service/polling-scheduler.spec.ts index 28d1c929..8855fcb9 100644 --- a/test/unit/dashboard-service/polling-scheduler.spec.ts +++ b/test/unit/dashboard-service/polling-scheduler.spec.ts @@ -14,6 +14,17 @@ describe('PollingScheduler', () => { clock.restore() }) + /** + * The scheduler uses recursive setTimeout (not setInterval), so each tick + * is only enqueued after the previous one resolves. With instant-resolving + * stubs the sequence is: + * T=0 start() → schedules tick at T=1000 + * T=1000 tick #1 resolves → schedules tick at T=2000 + * T=2000 tick #2 resolves → schedules tick at T=3000 + * T=3000 tick #3 resolves → schedules tick at T=4000 + * tickAsync drives the microtask queue between timer firings, so all three + * ticks complete inside tickAsync(3000). + */ it('runs tick callback on interval while running', async () => { const tick = Sinon.stub().resolves(undefined) const scheduler = new PollingScheduler(1000, tick) @@ -30,10 +41,57 @@ describe('PollingScheduler', () => { const scheduler = new PollingScheduler(500, tick) scheduler.start() - await clock.tickAsync(1000) + await clock.tickAsync(1000) // ticks at 500ms, 1000ms → 2 calls scheduler.stop() - await clock.tickAsync(1000) + await clock.tickAsync(1000) // no more ticks after stop expect(tick.callCount).to.equal(2) }) + + it('does not overlap ticks when callback is slow', async () => { + // Tick takes 1500ms — longer than the 1000ms interval. + // With setInterval this would cause overlap; with recursive setTimeout it must not. + let running = 0 + let maxConcurrent = 0 + + const tick = Sinon.stub().callsFake(async () => { + running++ + maxConcurrent = Math.max(maxConcurrent, running) + await clock.tickAsync(1500) + running-- + }) + + const scheduler = new PollingScheduler(1000, tick) + scheduler.start() + // Drive enough time for two potential overlapping cycles + await clock.tickAsync(4000) + scheduler.stop() + + expect(maxConcurrent).to.equal(1, 'ticks must not run concurrently') + }) + + it('continues scheduling after a failed tick', async () => { + const tick = Sinon.stub() + .onFirstCall().rejects(new Error('transient error')) + .resolves(undefined) + + const scheduler = new PollingScheduler(1000, tick) + scheduler.start() + await clock.tickAsync(3000) + scheduler.stop() + + // First tick rejects, but the scheduler must recover and run two more. + expect(tick.callCount).to.be.greaterThanOrEqual(2) + }) + + it('isRunning reflects scheduler state', () => { + const tick = Sinon.stub().resolves(undefined) + const scheduler = new PollingScheduler(1000, tick) + + expect(scheduler.isRunning()).to.equal(false) + scheduler.start() + expect(scheduler.isRunning()).to.equal(true) + scheduler.stop() + expect(scheduler.isRunning()).to.equal(false) + }) }) diff --git a/test/unit/dashboard-service/snapshot-service.spec.ts b/test/unit/dashboard-service/snapshot-service.spec.ts new file mode 100644 index 00000000..a1acba8a --- /dev/null +++ b/test/unit/dashboard-service/snapshot-service.spec.ts @@ -0,0 +1,107 @@ +import chai, { expect } from 'chai' +import chaiAsPromised from 'chai-as-promised' +import Sinon from 'sinon' + +import { IKPICollector, SnapshotService } from '../../../src/dashboard-service/services/snapshot-service' +import { DashboardMetrics } from '../../../src/dashboard-service/types' + +chai.use(chaiAsPromised) + +const createMetrics = (overrides: Partial = {}): DashboardMetrics => ({ + eventsByKind: [], + admittedUsers: 0, + satsPaid: 0, + topTalkers: { + allTime: [], + recent: [], + }, + ...overrides, +}) + +const makeCollector = (stub: Sinon.SinonStub): IKPICollector => ({ + collectMetrics: stub, +}) + +describe('SnapshotService', () => { + let sandbox: Sinon.SinonSandbox + + beforeEach(() => { + sandbox = Sinon.createSandbox() + }) + + afterEach(() => { + sandbox.restore() + }) + + it('updates snapshot when collected metrics change', async () => { + const firstMetrics = createMetrics({ admittedUsers: 1 }) + const nextMetrics = createMetrics({ admittedUsers: 2 }) + + const stub = sandbox.stub() + .onFirstCall().resolves(firstMetrics) + .onSecondCall().resolves(firstMetrics) + .onThirdCall().resolves(nextMetrics) + + const service = new SnapshotService(makeCollector(stub)) + + const first = await service.refresh() + expect(first.changed).to.equal(true, 'first refresh should report changed') + expect(first.snapshot.sequence).to.equal(1) + expect(first.snapshot.status).to.equal('live') + + const second = await service.refresh() + expect(second.changed).to.equal(false, 'second refresh with same metrics should not change') + expect(second.snapshot.sequence).to.equal(1, 'sequence must not advance when metrics are unchanged') + + const third = await service.refresh() + expect(third.changed).to.equal(true, 'third refresh with new metrics should report changed') + expect(third.snapshot.sequence).to.equal(2) + expect(third.snapshot.metrics.admittedUsers).to.equal(2) + }) + + it('does not advance sequence when metrics are identical across refreshes', async () => { + const metrics = createMetrics({ satsPaid: 100 }) + const stub = sandbox.stub().resolves(metrics) + + const service = new SnapshotService(makeCollector(stub)) + + const first = await service.refresh() + expect(first.changed).to.equal(true) + expect(first.snapshot.sequence).to.equal(1) + + const second = await service.refresh() + expect(second.changed).to.equal(false) + expect(second.snapshot.sequence).to.equal(1) + }) + + it('propagates collector errors to the caller', async () => { + const stub = sandbox.stub().rejects(new Error('db down')) + + const service = new SnapshotService(makeCollector(stub)) + + await expect(service.refresh()).to.be.rejectedWith('db down') + }) + + it('returns the last known snapshot via getSnapshot()', async () => { + const metrics = createMetrics({ admittedUsers: 5 }) + const stub = sandbox.stub().resolves(metrics) + + const service = new SnapshotService(makeCollector(stub)) + + await service.refresh() + + const snap = service.getSnapshot() + expect(snap.sequence).to.equal(1) + expect(snap.status).to.equal('live') + expect(snap.metrics.admittedUsers).to.equal(5) + }) + + it('sets status to live after a successful refresh', async () => { + const stub = sandbox.stub().resolves(createMetrics()) + + const service = new SnapshotService(makeCollector(stub)) + + const { snapshot } = await service.refresh() + expect(snapshot.status).to.equal('live') + }) +}) From 9648ba7e02e46f3a31e97d6cd28b6f9d30e67504 Mon Sep 17 00:00:00 2001 From: Kushagra Date: Sun, 12 Apr 2026 05:07:28 +0530 Subject: [PATCH 12/12] feat(dashboard, migrations): Added indexes to fields for faster queries --- ...0100_add_dashboard_live_notify_triggers.js | 44 +++++++++++++++++++ ...260412_020000_add_dashboard_kpi_indexes.js | 40 +++++++++++++++++ 2 files changed, 84 insertions(+) create mode 100644 migrations/20260411_210100_add_dashboard_live_notify_triggers.js create mode 100644 migrations/20260412_020000_add_dashboard_kpi_indexes.js diff --git a/migrations/20260411_210100_add_dashboard_live_notify_triggers.js b/migrations/20260411_210100_add_dashboard_live_notify_triggers.js new file mode 100644 index 00000000..6cc63c3c --- /dev/null +++ b/migrations/20260411_210100_add_dashboard_live_notify_triggers.js @@ -0,0 +1,44 @@ +exports.up = async function (knex) { + await knex.schema.raw(` + CREATE OR REPLACE FUNCTION notify_dashboard_events_changed() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('dashboard_events_changed', TG_OP); + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + `) + + await knex.schema.raw(` + DROP TRIGGER IF EXISTS dashboard_events_changed_trigger ON events; + + CREATE TRIGGER dashboard_events_changed_trigger + AFTER INSERT OR UPDATE OR DELETE ON events + FOR EACH STATEMENT + EXECUTE FUNCTION notify_dashboard_events_changed(); + `) + + await knex.schema.raw(` + CREATE OR REPLACE FUNCTION notify_dashboard_users_changed() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('dashboard_users_changed', TG_OP); + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + `) + + await knex.schema.raw(` + DROP TRIGGER IF EXISTS dashboard_users_changed_trigger ON users; + + CREATE TRIGGER dashboard_users_changed_trigger + AFTER INSERT OR UPDATE OR DELETE ON users + FOR EACH STATEMENT + EXECUTE FUNCTION notify_dashboard_users_changed(); + `) +} + +exports.down = async function (knex) { + await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_events_changed_trigger ON events;') + await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_users_changed_trigger ON users;') + await knex.schema.raw('DROP FUNCTION IF EXISTS notify_dashboard_events_changed();') + await knex.schema.raw('DROP FUNCTION IF EXISTS notify_dashboard_users_changed();') +} diff --git a/migrations/20260412_020000_add_dashboard_kpi_indexes.js b/migrations/20260412_020000_add_dashboard_kpi_indexes.js new file mode 100644 index 00000000..63af1af2 --- /dev/null +++ b/migrations/20260412_020000_add_dashboard_kpi_indexes.js @@ -0,0 +1,40 @@ +/** + * Migration: add dashboard KPI query indexes + * + * Without these the incremental collector degrades to sequential scans: + * - idx_events_cursor → covers the (first_seen, id) cursor predicate used in every + * incremental delta query and the bootstrap cursor select. + * - idx_events_pubkey → covers the GROUP BY event_pubkey in the all-time talker query. + * - idx_users_cursor → covers the (updated_at, pubkey) cursor predicate used in the + * user delta / cursor-select queries. + * + * All three are created CONCURRENTLY so they don't lock the table on a live relay. + * knex does not support CREATE INDEX CONCURRENTLY natively, so we use raw SQL and + * set `disableTransactions` to true (DDL inside a transaction would negate CONCURRENTLY). + */ + +exports.up = async (knex) => { + await knex.raw(` + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_cursor + ON events (first_seen, id); + `) + + await knex.raw(` + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_pubkey + ON events (event_pubkey); + `) + + await knex.raw(` + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_cursor + ON users (updated_at, pubkey); + `) +} + +exports.down = async (knex) => { + await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_events_cursor;') + await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_events_pubkey;') + await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_users_cursor;') +} + +// Required so knex doesn't wrap the CONCURRENTLY statements in a transaction. +exports.config = { transaction: false }