From 057634fba8ae8b02874b732644f83ab41b8b0a2e Mon Sep 17 00:00:00 2001 From: Kushagra Date: Thu, 9 Apr 2026 04:52:52 +0530 Subject: [PATCH 1/7] fix: added expired_at filter to message pipeline --- src/handlers/subscribe-message-handler.ts | 10 ++- .../subscribe-message-handler.spec.ts | 61 ++++++++++++++++++- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/src/handlers/subscribe-message-handler.ts b/src/handlers/subscribe-message-handler.ts index 1ab4e322..87fbde95 100644 --- a/src/handlers/subscribe-message-handler.ts +++ b/src/handlers/subscribe-message-handler.ts @@ -4,7 +4,7 @@ import { pipeline } from 'stream/promises' import { createEndOfStoredEventsNoticeMessage, createNoticeMessage, createOutgoingEventMessage } from '../utils/messages' import { IAbortable, IMessageHandler } from '../@types/message-handlers' -import { isEventMatchingFilter, toNostrEvent } from '../utils/event' +import { isEventMatchingFilter, isExpiredEvent, toNostrEvent } from '../utils/event' import { streamEach, streamEnd, streamFilter, streamMap } from '../utils/stream' import { SubscriptionFilter, SubscriptionId } from '../@types/subscription' import { createLogger } from '../factories/logger-factory' @@ -55,6 +55,12 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable { const sendEOSE = () => this.webSocket.emit(WebSocketAdapterEvent.Message, createEndOfStoredEventsNoticeMessage(subscriptionId)) const isSubscribedToEvent = SubscribeMessageHandler.isClientSubscribedToEvent(filters) + const isNotExpired = (event: Event)=>{ + if (isExpiredEvent(event)) { + return false + } + return true + } const findEvents = this.eventRepository.findByFilters(filters).stream() @@ -65,6 +71,7 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable { findEvents, streamFilter(propSatisfies(isNil, 'deleted_at')), streamMap(toNostrEvent), + streamFilter(isNotExpired), streamFilter(isSubscribedToEvent), streamEach(sendEvent), streamEnd(sendEOSE), @@ -117,3 +124,4 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable { } } + diff --git a/test/unit/handlers/subscribe-message-handler.spec.ts b/test/unit/handlers/subscribe-message-handler.spec.ts index adcdbb5a..c269a75e 100644 --- a/test/unit/handlers/subscribe-message-handler.spec.ts +++ b/test/unit/handlers/subscribe-message-handler.spec.ts @@ -3,6 +3,7 @@ import chai from 'chai' import chaiAsPromised from 'chai-as-promised' import EventEmitter from 'events' import Sinon from 'sinon' +import sinonChai from 'sinon-chai' import { IAbortable, IMessageHandler } from '../../../src/@types/message-handlers' import { MessageType, SubscribeMessage } from '../../../src/@types/messages' @@ -14,10 +15,14 @@ import { PassThrough } from 'stream' import { SubscribeMessageHandler } from '../../../src/handlers/subscribe-message-handler' import { WebSocketAdapterEvent } from '../../../src/constants/adapter' +chai.use(sinonChai) chai.use(chaiAsPromised) const { expect } = chai -const toDbEvent = (event: Event) => ({ +const toDbEvent = ( + event: Event, + metadata: { expires_at?: number, deleted_at?: Date | null } = {}, +) => ({ event_id: Buffer.from(event.id, 'hex'), event_kind: event.kind, event_pubkey: Buffer.from(event.pubkey, 'hex'), @@ -25,6 +30,7 @@ const toDbEvent = (event: Event) => ({ event_content: event.content, event_tags: event.tags, event_signature: Buffer.from(event.sig, 'hex'), + ...metadata, }) describe('SubscribeMessageHandler', () => { @@ -112,11 +118,13 @@ describe('SubscribeMessageHandler', () => { describe('#fetchAndSend', () => { let event: Event + let clock: Sinon.SinonFakeTimers let webSocketOnMessageStub: Sinon.SinonStub let webSocketOnSubscribeStub: Sinon.SinonStub let isClientSubscribedToEventStub: Sinon.SinonStub beforeEach(() => { + clock = Sinon.useFakeTimers(1665546189000) event = { 'id': 'b1601d26958e6508b7b9df0af609c652346c09392b6534d93aead9819a51b4ef', 'pubkey': '22e804d26ed16b68db5259e78449e96dab5d464c8f470bda3eb1a70467f2c793', @@ -136,6 +144,10 @@ describe('SubscribeMessageHandler', () => { //streamEndSpy = sandbox.spy(Stream, '_end' as any) }) + afterEach(() => { + clock.restore() + }) + it('does not send event if client is not subscribed to it', async () => { isClientSubscribedToEventStub.returns(always(false)) @@ -165,6 +177,53 @@ describe('SubscribeMessageHandler', () => { ) }) + it('does not send expired events', async () => { + isClientSubscribedToEventStub.returns(always(true)) + + const now = Math.floor(clock.now / 1000) + const promise = (handler as any).fetchAndSend(subscriptionId, filters) + + const expiredEvent: Event = { + ...event, + tags: [['expiration', String(now - 1)] as any], + } + + stream.write(toDbEvent(expiredEvent)) + stream.end() + + await promise + + expect(eventRepositoryFindByFiltersStub).to.have.been.calledOnceWithExactly(filters) + expect(webSocketOnMessageStub).to.have.been.calledOnceWithExactly( + ['EOSE', subscriptionId], + ) + }) + + it('sends event if expiration is in the future', async () => { + isClientSubscribedToEventStub.returns(always(true)) + + const now = Math.floor(clock.now / 1000) + const promise = (handler as any).fetchAndSend(subscriptionId, filters) + + const eventWithFutureExpiration: Event = { + ...event, + tags: [['expiration', String(now + 60)] as any], + } + + stream.write(toDbEvent(eventWithFutureExpiration)) + stream.end() + + await promise + + expect(eventRepositoryFindByFiltersStub).to.have.been.calledOnceWithExactly(filters) + expect(webSocketOnMessageStub).to.have.been.calledWithExactly( + ['EVENT', subscriptionId, eventWithFutureExpiration], + ) + expect(webSocketOnMessageStub).to.have.been.calledWithExactly( + ['EOSE', subscriptionId], + ) + }) + it('sends EOSE', async () => { const promise = (handler as any).fetchAndSend(subscriptionId, filters) From 8d1771baa64799ab97bbf2bc0b499dcd2a1fd1ef Mon Sep 17 00:00:00 2001 From: Kushagra Date: Fri, 10 Apr 2026 14:41:07 +0530 Subject: [PATCH 2/7] feat(dashboard): Implemented basic snapshot service, polling service and ws connection management --- .../polling/polling-scheduler.ts | 43 ++++++ .../services/snapshot-service.ts | 34 +++++ src/dashboard-service/ws/dashboard-ws-hub.ts | 141 ++++++++++++++++++ 3 files changed, 218 insertions(+) create mode 100644 src/dashboard-service/polling/polling-scheduler.ts create mode 100644 src/dashboard-service/services/snapshot-service.ts create mode 100644 src/dashboard-service/ws/dashboard-ws-hub.ts diff --git a/src/dashboard-service/polling/polling-scheduler.ts b/src/dashboard-service/polling/polling-scheduler.ts new file mode 100644 index 00000000..c24898d9 --- /dev/null +++ b/src/dashboard-service/polling/polling-scheduler.ts @@ -0,0 +1,43 @@ +import { createLogger } from '../../factories/logger-factory' + +type Tick = () => Promise | void + +const debug = createLogger('dashboard-service:polling') + +export class PollingScheduler { + private timer: NodeJS.Timer | undefined + + public constructor( + private readonly intervalMs: number, + private readonly tick: Tick, + ) { } + + public start(): void { + if (this.timer) { + return + } + + debug('starting scheduler with interval %d ms', this.intervalMs) + + this.timer = setInterval(() => { + Promise.resolve(this.tick()) + .catch((error) => { + console.error('dashboard-service: polling tick failed', error) + }) + }, this.intervalMs) + } + + public stop(): void { + if (!this.timer) { + return + } + + debug('stopping scheduler') + clearInterval(this.timer) + this.timer = undefined + } + + public isRunning(): boolean { + return Boolean(this.timer) + } +} diff --git a/src/dashboard-service/services/snapshot-service.ts b/src/dashboard-service/services/snapshot-service.ts new file mode 100644 index 00000000..b1a511fe --- /dev/null +++ b/src/dashboard-service/services/snapshot-service.ts @@ -0,0 +1,34 @@ +import { KPISnapshot } from '../types' + +export class SnapshotService { + private sequence = 0 + + private snapshot: KPISnapshot = { + sequence: this.sequence, + generatedAt: new Date(0).toISOString(), + status: 'placeholder', + metrics: { + eventsByKind: [], + admittedUsers: null, + satsPaid: null, + topTalkers: [], + }, + } + + public getSnapshot(): KPISnapshot { + return this.snapshot + } + + // Phase 1 placeholder: advances sequence/time so polling and websocket flow can be validated end-to-end. + public refreshPlaceholder(): KPISnapshot { + this.sequence += 1 + + this.snapshot = { + ...this.snapshot, + sequence: this.sequence, + generatedAt: new Date().toISOString(), + } + + return this.snapshot + } +} diff --git a/src/dashboard-service/ws/dashboard-ws-hub.ts b/src/dashboard-service/ws/dashboard-ws-hub.ts new file mode 100644 index 00000000..1b3b330d --- /dev/null +++ b/src/dashboard-service/ws/dashboard-ws-hub.ts @@ -0,0 +1,141 @@ +import { DashboardServerMessage, KPISnapshot } from '../types' +import { RawData, WebSocketServer } from 'ws' +import { createLogger } from '../../factories/logger-factory' +import WebSocket from 'ws' + +const debug = createLogger('dashboard-service:ws') + +export class DashboardWebSocketHub { + public constructor( + private readonly webSocketServer: WebSocketServer, + private readonly getSnapshot: () => KPISnapshot, + ) { + console.info('dashboard-service: websocket hub initialized') + + this.webSocketServer + .on('connection', this.onConnection.bind(this)) + .on('close', () => { + console.info('dashboard-service: websocket server closed') + }) + .on('error', (error) => { + console.error('dashboard-service: websocket server error', error) + }) + } + + public broadcastSnapshot(snapshot: KPISnapshot): void { + this.broadcast({ + type: 'kpi.snapshot', + payload: snapshot, + }) + } + + public broadcastTick(sequence: number): void { + this.broadcast({ + type: 'kpi.tick', + payload: { + at: new Date().toISOString(), + sequence, + }, + }) + } + + public close(): void { + console.info('dashboard-service: closing websocket hub') + this.webSocketServer.clients.forEach((client) => { + client.close() + }) + this.webSocketServer.removeAllListeners() + } + + private onConnection(client: WebSocket): void { + const connectedClients = this.getConnectedClientsCount() + console.info('dashboard-service: websocket client connected (clients=%d)', connectedClients) + + client + .on('close', (code, reason) => { + console.info( + 'dashboard-service: websocket client disconnected (code=%d, reason=%s, clients=%d)', + code, + reason.toString(), + this.getConnectedClientsCount(), + ) + }) + .on('error', (error) => { + console.error('dashboard-service: websocket client error', error) + }) + .on('message', (raw) => { + this.onClientMessage(raw) + }) + + this.send(client, { + type: 'dashboard.connected', + payload: { + at: new Date().toISOString(), + }, + }) + + this.send(client, { + type: 'kpi.snapshot', + payload: this.getSnapshot(), + }) + + debug('dashboard websocket bootstrap snapshot sent') + } + + private onClientMessage(raw: RawData): void { + try { + const rawMessage = this.toUtf8(raw) + const message = JSON.parse(rawMessage) + debug('dashboard websocket client message received: %o', message) + } catch (error) { + console.error('dashboard-service: websocket message parsing failed', error) + } + } + + private broadcast(message: DashboardServerMessage): void { + this.webSocketServer.clients.forEach((client) => { + if (client.readyState !== WebSocket.OPEN) { + return + } + this.send(client, message) + }) + } + + private send(client: WebSocket, message: DashboardServerMessage): void { + if (client.readyState !== WebSocket.OPEN) { + return + } + + try { + client.send(JSON.stringify(message)) + } catch (error) { + console.error('dashboard-service: websocket send failed', error) + } + } + + private toUtf8(raw: RawData): string { + if (typeof raw === 'string') { + return raw + } + + if (Buffer.isBuffer(raw)) { + return raw.toString('utf8') + } + + if (Array.isArray(raw)) { + return raw.map((chunk) => { + if (Buffer.isBuffer(chunk)) { + return chunk.toString('utf8') + } + + return Buffer.from(chunk as ArrayBuffer).toString('utf8') + }).join('') + } + + return Buffer.from(raw).toString('utf8') + } + + private getConnectedClientsCount(): number { + return Array.from(this.webSocketServer.clients).filter((client) => client.readyState === WebSocket.OPEN).length + } +} From 4e8c91660803cff295a1009dde9b0ce9af002158 Mon Sep 17 00:00:00 2001 From: Kushagra Date: Fri, 10 Apr 2026 14:42:47 +0530 Subject: [PATCH 3/7] feat(dashboard): Implemented basic message types, handlers and API endpoints --- src/dashboard-service/api/dashboard-router.ts | 12 ++++++++ .../controllers/get-health-controller.ts | 12 ++++++++ .../get-kpi-snapshot-controller.ts | 20 +++++++++++++ .../get-health-request-handler.ts | 5 ++++ .../get-kpi-snapshot-request-handler.ts | 8 +++++ src/dashboard-service/types.ts | 30 +++++++++++++++++++ 6 files changed, 87 insertions(+) create mode 100644 src/dashboard-service/api/dashboard-router.ts create mode 100644 src/dashboard-service/controllers/get-health-controller.ts create mode 100644 src/dashboard-service/controllers/get-kpi-snapshot-controller.ts create mode 100644 src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts create mode 100644 src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts create mode 100644 src/dashboard-service/types.ts diff --git a/src/dashboard-service/api/dashboard-router.ts b/src/dashboard-service/api/dashboard-router.ts new file mode 100644 index 00000000..dc977c52 --- /dev/null +++ b/src/dashboard-service/api/dashboard-router.ts @@ -0,0 +1,12 @@ +import { Router } from 'express' + +import { createGetKPISnapshotRequestHandler } from '../handlers/request-handlers/get-kpi-snapshot-request-handler' +import { SnapshotService } from '../services/snapshot-service' + +export const createDashboardRouter = (snapshotService: SnapshotService): Router => { + const router = Router() + + router.get('/snapshot', createGetKPISnapshotRequestHandler(snapshotService)) + + return router +} diff --git a/src/dashboard-service/controllers/get-health-controller.ts b/src/dashboard-service/controllers/get-health-controller.ts new file mode 100644 index 00000000..0c7037a3 --- /dev/null +++ b/src/dashboard-service/controllers/get-health-controller.ts @@ -0,0 +1,12 @@ +import { Request, Response } from 'express' + +import { IController } from '../../@types/controllers' + +export class GetHealthController implements IController { + public async handleRequest(_request: Request, response: Response): Promise { + response + .status(200) + .setHeader('content-type', 'application/json; charset=utf-8') + .send({ status: 'ok' }) + } +} \ No newline at end of file diff --git a/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts b/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts new file mode 100644 index 00000000..741d9902 --- /dev/null +++ b/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts @@ -0,0 +1,20 @@ +import { Request, Response } from 'express' + +import { DashboardSnapshotResponse } from '../types' +import { IController } from '../../@types/controllers' +import { SnapshotService } from '../services/snapshot-service' + +export class GetKPISnapshotController implements IController { + public constructor(private readonly snapshotService: SnapshotService) { } + + public async handleRequest(_request: Request, response: Response): Promise { + const payload: DashboardSnapshotResponse = { + data: this.snapshotService.getSnapshot(), + } + + response + .status(200) + .setHeader('content-type', 'application/json; charset=utf-8') + .send(payload) + } +} \ No newline at end of file diff --git a/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts b/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts new file mode 100644 index 00000000..e8f2cea4 --- /dev/null +++ b/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts @@ -0,0 +1,5 @@ +import { withController } from '../../../handlers/request-handlers/with-controller-request-handler' + +import { GetHealthController } from '../../controllers/get-health-controller' + +export const getHealthRequestHandler = withController(() => new GetHealthController()) \ No newline at end of file diff --git a/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts b/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts new file mode 100644 index 00000000..507e181b --- /dev/null +++ b/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts @@ -0,0 +1,8 @@ +import { withController } from '../../../handlers/request-handlers/with-controller-request-handler' + +import { GetKPISnapshotController } from '../../controllers/get-kpi-snapshot-controller' +import { SnapshotService } from '../../services/snapshot-service' + +export const createGetKPISnapshotRequestHandler = (snapshotService: SnapshotService) => { + return withController(() => new GetKPISnapshotController(snapshotService)) +} \ No newline at end of file diff --git a/src/dashboard-service/types.ts b/src/dashboard-service/types.ts new file mode 100644 index 00000000..02885a36 --- /dev/null +++ b/src/dashboard-service/types.ts @@ -0,0 +1,30 @@ +export interface TopTalker { + pubkey: string + count: number +} + +export interface KPISnapshot { + sequence: number + generatedAt: string + status: 'placeholder' + metrics: { + eventsByKind: Array<{ kind: string, count: number }> + admittedUsers: number | null + satsPaid: number | null + topTalkers: TopTalker[] + } +} + +export interface DashboardSnapshotResponse { + data: KPISnapshot +} + +export interface DashboardWebSocketEnvelope { + type: TType + payload: TPayload +} + +export type DashboardServerMessage = + | DashboardWebSocketEnvelope<'dashboard.connected', { at: string }> + | DashboardWebSocketEnvelope<'kpi.snapshot', KPISnapshot> + | DashboardWebSocketEnvelope<'kpi.tick', { at: string, sequence: number }> From 564f405a349e025ff118d1d05798fd32d3369859 Mon Sep 17 00:00:00 2001 From: Kushagra Date: Fri, 10 Apr 2026 14:43:51 +0530 Subject: [PATCH 4/7] feat(dashboard): Wired dashboard server --- src/dashboard-service/app.ts | 116 ++++++++++++++++++++++++++++++++ src/dashboard-service/config.ts | 28 ++++++++ src/dashboard-service/index.ts | 42 ++++++++++++ 3 files changed, 186 insertions(+) create mode 100644 src/dashboard-service/app.ts create mode 100644 src/dashboard-service/config.ts create mode 100644 src/dashboard-service/index.ts diff --git a/src/dashboard-service/app.ts b/src/dashboard-service/app.ts new file mode 100644 index 00000000..fd19a5cf --- /dev/null +++ b/src/dashboard-service/app.ts @@ -0,0 +1,116 @@ +import { createDashboardRouter } from './api/dashboard-router' +import { createLogger } from '../factories/logger-factory' +import { DashboardServiceConfig } from './config' +import { DashboardWebSocketHub } from './ws/dashboard-ws-hub' +import express from 'express' +import { getHealthRequestHandler } from './handlers/request-handlers/get-health-request-handler' +import http from 'http' +import { PollingScheduler } from './polling/polling-scheduler' +import { SnapshotService } from './services/snapshot-service' +import { WebSocketServer } from 'ws' + +const debug = createLogger('dashboard-service:app') + +export interface DashboardService { + readonly config: DashboardServiceConfig + readonly snapshotService: SnapshotService + readonly pollingScheduler: PollingScheduler + start(): Promise + stop(): Promise + getHttpPort(): number +} + +export const createDashboardService = (config: DashboardServiceConfig): DashboardService => { + console.info( + 'dashboard-service: creating service (host=%s, port=%d, wsPath=%s, pollIntervalMs=%d)', + config.host, + config.port, + config.wsPath, + config.pollIntervalMs, + ) + + const snapshotService = new SnapshotService() + + const app = express() + .disable('x-powered-by') + .get('/healthz', getHealthRequestHandler) + .use('/api/v1/kpis', createDashboardRouter(snapshotService)) + + const webServer = http.createServer(app) + const webSocketServer = new WebSocketServer({ + server: webServer, + path: config.wsPath, + }) + + const webSocketHub = new DashboardWebSocketHub(webSocketServer, () => snapshotService.getSnapshot()) + + const pollingScheduler = new PollingScheduler(config.pollIntervalMs, () => { + const nextSnapshot = snapshotService.refreshPlaceholder() + debug('poll tick produced snapshot sequence=%d', nextSnapshot.sequence) + webSocketHub.broadcastTick(nextSnapshot.sequence) + webSocketHub.broadcastSnapshot(nextSnapshot) + }) + + const start = async () => { + if (webServer.listening) { + debug('start requested but service is already listening') + return + } + + console.info('dashboard-service: starting http and websocket servers') + + await new Promise((resolve, reject) => { + webServer.listen(config.port, config.host, () => { + const address = webServer.address() + debug('listening on %o', address) + console.info('dashboard-service: listening on %o', address) + resolve() + }) + webServer.once('error', (error) => { + console.error('dashboard-service: failed to start server', error) + reject(error) + }) + }) + + pollingScheduler.start() + console.info('dashboard-service: polling scheduler started') + } + + const stop = async () => { + console.info('dashboard-service: stopping service') + pollingScheduler.stop() + webSocketHub.close() + await new Promise((resolve, reject) => { + if (!webServer.listening) { + debug('stop requested while server was already stopped') + resolve() + return + } + + webServer.close((error) => { + if (error) { + console.error('dashboard-service: failed to stop cleanly', error) + reject(error) + return + } + + console.info('dashboard-service: http server closed') + resolve() + }) + }) + } + + const getHttpPort = (): number => { + const address = webServer.address() + return typeof address === 'object' && address !== null ? address.port : config.port + } + + return { + config, + snapshotService, + pollingScheduler, + start, + stop, + getHttpPort, + } +} diff --git a/src/dashboard-service/config.ts b/src/dashboard-service/config.ts new file mode 100644 index 00000000..3d888ae8 --- /dev/null +++ b/src/dashboard-service/config.ts @@ -0,0 +1,28 @@ +export interface DashboardServiceConfig { + host: string + port: number + wsPath: string + pollIntervalMs: number +} + +const parseInteger = (value: string | undefined, fallback: number): number => { + if (typeof value === 'undefined' || value === '') { + return fallback + } + + const parsed = Number(value) + if (!Number.isInteger(parsed) || parsed < 0) { + return fallback + } + + return parsed +} + +export const getDashboardServiceConfig = (): DashboardServiceConfig => { + return { + host: process.env.DASHBOARD_SERVICE_HOST ?? '127.0.0.1', + port: parseInteger(process.env.DASHBOARD_SERVICE_PORT, 8011), + wsPath: process.env.DASHBOARD_WS_PATH ?? '/api/v1/kpis/stream', + pollIntervalMs: parseInteger(process.env.DASHBOARD_POLL_INTERVAL_MS, 5000), + } +} diff --git a/src/dashboard-service/index.ts b/src/dashboard-service/index.ts new file mode 100644 index 00000000..4f64161c --- /dev/null +++ b/src/dashboard-service/index.ts @@ -0,0 +1,42 @@ +import { createLogger } from '../factories/logger-factory' + +import { createDashboardService } from './app' +import { getDashboardServiceConfig } from './config' + +const debug = createLogger('dashboard-service:index') + +const run = async () => { + const config = getDashboardServiceConfig() + console.info('dashboard-service: bootstrapping with config %o', config) + const service = createDashboardService(config) + + const shutdown = async () => { + console.info('dashboard-service: received shutdown signal') + debug('received shutdown signal') + await service.stop() + process.exit(0) + } + + process + .on('SIGINT', shutdown) + .on('SIGTERM', shutdown) + + process.on('uncaughtException', (error) => { + console.error('dashboard-service: uncaught exception', error) + }) + + process.on('unhandledRejection', (error) => { + console.error('dashboard-service: unhandled rejection', error) + }) + + await service.start() +} + +if (require.main === module) { + run().catch((error) => { + console.error('dashboard-service: unable to start', error) + process.exit(1) + }) +} + +export { run } From a8d119bbc6f0e3052c2ca30a2995fd43aa24ebfe Mon Sep 17 00:00:00 2001 From: Kushagra Date: Fri, 10 Apr 2026 14:45:09 +0530 Subject: [PATCH 5/7] feat(dashboard/test): Added tests for dashboard endpoints --- test/unit/dashboard-service/app.spec.ts | 43 +++++++++++++++++++ .../polling-scheduler.spec.ts | 39 +++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 test/unit/dashboard-service/app.spec.ts create mode 100644 test/unit/dashboard-service/polling-scheduler.spec.ts diff --git a/test/unit/dashboard-service/app.spec.ts b/test/unit/dashboard-service/app.spec.ts new file mode 100644 index 00000000..67a092d6 --- /dev/null +++ b/test/unit/dashboard-service/app.spec.ts @@ -0,0 +1,43 @@ +import axios from 'axios' +import { createDashboardService } from '../../../src/dashboard-service/app' +import { expect } from 'chai' +import WebSocket from 'ws' + +describe('dashboard-service app', () => { + it('serves health, snapshot, and websocket endpoints', async () => { + const service = createDashboardService({ + host: '127.0.0.1', + port: 0, + wsPath: '/api/v1/kpis/stream', + pollIntervalMs: 1000, + }) + + await service.start() + + const port = service.getHttpPort() + + const healthResponse = await axios.get(`http://127.0.0.1:${port}/healthz`) + expect(healthResponse.status).to.equal(200) + + const snapshotResponse = await axios.get(`http://127.0.0.1:${port}/api/v1/kpis/snapshot`) + expect(snapshotResponse.status).to.equal(200) + + const snapshotJson = snapshotResponse.data as any + expect(snapshotJson.data).to.have.property('sequence') + + const ws = new WebSocket(`ws://127.0.0.1:${port}/api/v1/kpis/stream`) + + const connectedEvent = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('timeout waiting for ws message')), 2000) + ws.once('message', (raw) => { + clearTimeout(timeout) + resolve(JSON.parse(raw.toString())) + }) + }) + + expect(connectedEvent).to.have.property('type', 'dashboard.connected') + + ws.close() + await service.stop() + }) +}) diff --git a/test/unit/dashboard-service/polling-scheduler.spec.ts b/test/unit/dashboard-service/polling-scheduler.spec.ts new file mode 100644 index 00000000..28d1c929 --- /dev/null +++ b/test/unit/dashboard-service/polling-scheduler.spec.ts @@ -0,0 +1,39 @@ +import { expect } from 'chai' +import Sinon from 'sinon' + +import { PollingScheduler } from '../../../src/dashboard-service/polling/polling-scheduler' + +describe('PollingScheduler', () => { + let clock: Sinon.SinonFakeTimers + + beforeEach(() => { + clock = Sinon.useFakeTimers() + }) + + afterEach(() => { + clock.restore() + }) + + it('runs tick callback on interval while running', async () => { + const tick = Sinon.stub().resolves(undefined) + const scheduler = new PollingScheduler(1000, tick) + + scheduler.start() + await clock.tickAsync(3000) + + expect(tick.callCount).to.equal(3) + scheduler.stop() + }) + + it('stops running when stop is called', async () => { + const tick = Sinon.stub().resolves(undefined) + const scheduler = new PollingScheduler(500, tick) + + scheduler.start() + await clock.tickAsync(1000) + scheduler.stop() + await clock.tickAsync(1000) + + expect(tick.callCount).to.equal(2) + }) +}) From 4b6e89b1e4eec6e3d125b6949766927146e34101 Mon Sep 17 00:00:00 2001 From: Kushagra Date: Fri, 10 Apr 2026 14:46:07 +0530 Subject: [PATCH 6/7] feat: wired test scripts --- package.json | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/package.json b/package.json index 9d9f902f..7039f1b7 100644 --- a/package.json +++ b/package.json @@ -23,10 +23,12 @@ "main": "src/index.ts", "scripts": { "dev": "node -r ts-node/register src/index.ts", + "dev:dashboard": "node -r ts-node/register src/dashboard-service/index.ts", "clean": "rimraf ./{dist,.nyc_output,.test-reports,.coverage}", "build": "tsc --project tsconfig.build.json", "prestart": "npm run build", "start": "cd dist && node src/index.js", + "start:dashboard": "node dist/src/dashboard-service/index.js", "build:check": "npm run build -- --noEmit", "lint": "eslint --ext .ts ./src ./test", "lint:report": "eslint -o .lint-reports/eslint.json -f json --ext .ts ./src ./test", @@ -37,6 +39,8 @@ "db:seed": "knex seed:run", "pretest:unit": "mkdir -p .test-reports/unit", "test:unit": "mocha 'test/**/*.spec.ts'", + "test:unit:dashboard": "mocha 'test/unit/dashboard-service/**/*.spec.ts'", + "test:e2e:dashboard:ws": "node scripts/check_dashboard_ws_updates.js", "test:unit:watch": "npm run test:unit -- --min --watch --watch-files src/**/*,test/**/*", "cover:unit": "nyc --report-dir .coverage/unit npm run test:unit", "docker:build": "docker build -t nostream .", From 28807f441188dba7fb0dc882572d55d7a3783c0c Mon Sep 17 00:00:00 2001 From: Kushagra Date: Sun, 12 Apr 2026 04:22:35 +0530 Subject: [PATCH 7/7] feat(dashboard): Added simple polling based KPI collection service(whole table sanner) --- .../services/kpi-collector-service.ts | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 src/dashboard-service/services/kpi-collector-service.ts diff --git a/src/dashboard-service/services/kpi-collector-service.ts b/src/dashboard-service/services/kpi-collector-service.ts new file mode 100644 index 00000000..51139733 --- /dev/null +++ b/src/dashboard-service/services/kpi-collector-service.ts @@ -0,0 +1,134 @@ +import { DashboardMetrics, EventsByKindCount, TopTalker } from '../types' +import { createLogger } from '../../factories/logger-factory' +import { DatabaseClient } from '../../@types/base' + +const debug = createLogger('dashboard-service:kpi-collector') + +const DEFAULT_TRACKED_KINDS = [7, 1, 6, 1984, 4, 3, 9735] + +const toNumber = (value: unknown): number => { + if (typeof value === 'number') { + return value + } + + if (typeof value === 'string' && value !== '') { + return Number(value) + } + + return 0 +} + +export class KPICollectorService { + public constructor( + private readonly dbClient: DatabaseClient, + private readonly trackedKinds: number[] = DEFAULT_TRACKED_KINDS, + private readonly topTalkersLimit = 10, + private readonly recentDays = 3, + ) { } + + public async collectMetrics(): Promise { + debug('collecting dashboard metrics') + + const [ + eventsByKind, + admittedUsers, + satsPaid, + allTimeTopTalkers, + recentTopTalkers, + ] = await Promise.all([ + this.getEventsByKind(), + this.getAdmittedUsersCount(), + this.getSatsPaidCount(), + this.getTopTalkersAllTime(), + this.getTopTalkersRecent(), + ]) + + return { + eventsByKind, + admittedUsers, + satsPaid, + topTalkers: { + allTime: allTimeTopTalkers, + recent: recentTopTalkers, + }, + } + } + + private async getEventsByKind(): Promise { + const rows = await this.dbClient('events') + .select('event_kind') + .count('id as count') + .whereIn('event_kind', this.trackedKinds) + .groupBy('event_kind') + .orderBy('count', 'desc') as Array<{ event_kind: number, count: string }> + + const other = await this.dbClient('events') + .whereNotIn('event_kind', this.trackedKinds) + .count<{ count: string }>('id as count') + .first() + + const eventsByKind = rows.map((row) => { + return { + kind: String(row.event_kind), + count: toNumber(row.count), + } + }) + + eventsByKind.push({ + kind: 'other', + count: toNumber(other?.count), + }) + + return eventsByKind + } + + private async getAdmittedUsersCount(): Promise { + const result = await this.dbClient('users') + .where('is_admitted', true) + .count<{ count: string }>('pubkey as count') + .first() + + return toNumber(result?.count) + } + + private async getSatsPaidCount(): Promise { + const result = await this.dbClient('users') + .where('is_admitted', true) + .sum<{ total: string | null }>('balance as total') + .first() + + const millisats = toNumber(result?.total) + return millisats / 1000 + } + + private async getTopTalkersAllTime(): Promise { + const rows = await this.dbClient('events') + .select(this.dbClient.raw("encode(event_pubkey, 'hex') as pubkey")) + .count('id as count') + .groupBy('event_pubkey') + .orderBy('count', 'desc') + .limit(this.topTalkersLimit) as unknown as Array<{ pubkey: string | Buffer, count: string | number }> + + return rows.map((row) => ({ + pubkey: String(row.pubkey), + count: toNumber(row.count), + })) + } + + private async getTopTalkersRecent(): Promise { + const since = new Date(Date.now() - this.recentDays * 24 * 60 * 60 * 1000) + + const rows = await this.dbClient('events') + .select(this.dbClient.raw("encode(event_pubkey, 'hex') as pubkey")) + .count('id as count') + .where('first_seen', '>=', since) + .groupBy('event_pubkey') + .orderBy('count', 'desc') + .limit(this.topTalkersLimit) as unknown as Array<{ pubkey: string | Buffer, count: string | number }> + + return rows.map((row) => ({ + pubkey: String(row.pubkey), + count: toNumber(row.count), + })) + } +}