diff --git a/package-lock.json b/package-lock.json index 0ecb6ac4..9697cec0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -45,7 +45,7 @@ "@types/express": "4.17.15", "@types/js-yaml": "4.0.5", "@types/mocha": "^9.1.1", - "@types/node": "^24.0.0", + "@types/node": "^24.12.2", "@types/pg": "^8.6.5", "@types/ramda": "^0.28.13", "@types/sinon": "^10.0.11", diff --git a/package.json b/package.json index 9d9f902f..1fa4a640 100644 --- a/package.json +++ b/package.json @@ -86,7 +86,7 @@ "@types/express": "4.17.15", "@types/js-yaml": "4.0.5", "@types/mocha": "^9.1.1", - "@types/node": "^24.0.0", + "@types/node": "^24.12.2", "@types/pg": "^8.6.5", "@types/ramda": "^0.28.13", "@types/sinon": "^10.0.11", diff --git a/src/@types/adapters.ts b/src/@types/adapters.ts index 0e491c3a..b8c06eab 100644 --- a/src/@types/adapters.ts +++ b/src/@types/adapters.ts @@ -21,7 +21,7 @@ export type IWebSocketAdapter = EventEmitter & { export interface ICacheAdapter { getKey(key: string): Promise hasKey(key: string): Promise - setKey(key: string, value: string): Promise + setKey(key: string, value: string, expirySeconds?: number): Promise addToSortedSet(key: string, set: Record | Record[]): Promise removeRangeByScoreFromSortedSet(key: string, min: number, max: number): Promise getRangeFromSortedSet(key: string, start: number, stop: number): Promise diff --git a/src/adapters/redis-adapter.ts b/src/adapters/redis-adapter.ts index fcb5a39e..e2df49ba 100644 --- a/src/adapters/redis-adapter.ts +++ b/src/adapters/redis-adapter.ts @@ -42,7 +42,7 @@ export class RedisAdapter implements ICacheAdapter { } private onClientError(error: Error) { - console.error('Unable to connect to Redis.', error) + debug('Unable to connect to Redis.', error) // throw error } @@ -58,9 +58,12 @@ export class RedisAdapter implements ICacheAdapter { return this.client.get(key) } - public async setKey(key: string, value: string): Promise { + public async setKey(key: string, value: string, expirySeconds?: number): Promise { await this.connection - debug('get %s key', key) + debug('set %s key', key) + if (typeof expirySeconds === 'number') { + return 'OK' === await this.client.set(key, value, { EX: expirySeconds }) + } return 'OK' === await this.client.set(key, value) } diff --git a/src/constants/caching.ts b/src/constants/caching.ts new file mode 100644 index 00000000..7bd6e556 --- /dev/null +++ b/src/constants/caching.ts @@ -0,0 +1,5 @@ +export enum CacheAdmissionState { + ADMITTED = 'admitted', + BLOCKED_NOT_ADMITTED = 'blocked_not_admitted', + BLOCKED_INSUFFICIENT_BALANCE = 'blocked_insufficient_balance', +} diff --git a/src/factories/message-handler-factory.ts b/src/factories/message-handler-factory.ts index 34c37493..7ecb0786 100644 --- a/src/factories/message-handler-factory.ts +++ b/src/factories/message-handler-factory.ts @@ -1,13 +1,23 @@ +import { ICacheAdapter, IWebSocketAdapter } from '../@types/adapters' import { IEventRepository, IUserRepository } from '../@types/repositories' import { IncomingMessage, MessageType } from '../@types/messages' import { createSettings } from './settings-factory' import { EventMessageHandler } from '../handlers/event-message-handler' import { eventStrategyFactory } from './event-strategy-factory' -import { IWebSocketAdapter } from '../@types/adapters' +import { getCacheClient } from '../cache/client' +import { RedisAdapter } from '../adapters/redis-adapter' import { slidingWindowRateLimiterFactory } from './rate-limiter-factory' import { SubscribeMessageHandler } from '../handlers/subscribe-message-handler' import { UnsubscribeMessageHandler } from '../handlers/unsubscribe-message-handler' +let cacheAdapter: ICacheAdapter | undefined = undefined +const getCache = (): ICacheAdapter => { + if (!cacheAdapter) { + cacheAdapter = new RedisAdapter(getCacheClient()) + } + return cacheAdapter +} + export const messageHandlerFactory = ( eventRepository: IEventRepository, userRepository: IUserRepository, @@ -22,6 +32,7 @@ export const messageHandlerFactory = ( userRepository, createSettings, slidingWindowRateLimiterFactory, + getCache(), ) } case MessageType.REQ: diff --git a/src/handlers/event-message-handler.ts b/src/handlers/event-message-handler.ts index 43d1ba22..2b293f25 100644 --- a/src/handlers/event-message-handler.ts +++ b/src/handlers/event-message-handler.ts @@ -15,9 +15,11 @@ import { } from '../utils/event' import { IEventRepository, IUserRepository } from '../@types/repositories' import { IEventStrategy, IMessageHandler } from '../@types/message-handlers' +import { CacheAdmissionState } from '../constants/caching' import { createCommandResult } from '../utils/messages' import { createLogger } from '../factories/logger-factory' import { Factory } from '../@types/base' +import { ICacheAdapter } from '../@types/adapters' import { IncomingEventMessage } from '../@types/messages' import { IRateLimiter } from '../@types/utils' import { IWebSocketAdapter } from '../@types/adapters' @@ -33,6 +35,7 @@ export class EventMessageHandler implements IMessageHandler { protected readonly userRepository: IUserRepository, private readonly settings: () => Settings, private readonly slidingWindowRateLimiter: Factory, + private readonly cache: ICacheAdapter, ) {} public async handleMessage(message: IncomingEventMessage): Promise { @@ -92,7 +95,6 @@ export class EventMessageHandler implements IMessageHandler { try { await strategy.execute(event) } catch (error) { - console.error('error handling message', message, error) this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: unable to process event')) } } @@ -313,17 +315,44 @@ export class EventMessageHandler implements IMessageHandler { return } - // const hasKey = await this.cache.hasKey(`${event.pubkey}:is-admitted`) - // TODO: use cache + const cacheKey = `${event.pubkey}:is-admitted` + + try { + const cachedValue = await this.cache.getKey(cacheKey) + if (cachedValue === CacheAdmissionState.ADMITTED) { + debug('cache hit for %s admission: admitted', event.pubkey) + return + } + if (cachedValue === CacheAdmissionState.BLOCKED_NOT_ADMITTED) { + debug('cache hit for %s admission: blocked', event.pubkey) + return 'blocked: pubkey not admitted' + } + if (cachedValue === CacheAdmissionState.BLOCKED_INSUFFICIENT_BALANCE) { + debug('cache hit for %s admission: insufficient balance', event.pubkey) + return 'blocked: insufficient balance' + } + } catch (error) { + debug('cache error for %s: %o', event.pubkey, error) + } + const user = await this.userRepository.findByPubkey(event.pubkey) if (!user || !user.isAdmitted) { + this.cacheSet(cacheKey, CacheAdmissionState.BLOCKED_NOT_ADMITTED, 60) return 'blocked: pubkey not admitted' } const minBalance = currentSettings.limits?.event?.pubkey?.minBalance ?? 0n if (minBalance > 0n && user.balance < minBalance) { + this.cacheSet(cacheKey, CacheAdmissionState.BLOCKED_INSUFFICIENT_BALANCE, 60) return 'blocked: insufficient balance' } + + this.cacheSet(cacheKey, CacheAdmissionState.ADMITTED, 300) + } + + private cacheSet(key: string, value: string, ttl: number): void { + this.cache.setKey(key, value, ttl) + .catch((error) => debug('unable to cache %s: %o', key, error)) } protected addExpirationMetadata(event: Event): Event | ExpiringEvent { diff --git a/test/unit/handlers/event-message-handler.spec.ts b/test/unit/handlers/event-message-handler.spec.ts index 6895ec7c..90531705 100644 --- a/test/unit/handlers/event-message-handler.spec.ts +++ b/test/unit/handlers/event-message-handler.spec.ts @@ -10,6 +10,7 @@ chai.use(chaiAsPromised) import { EventLimits, Settings } from '../../../src/@types/settings' import { IncomingEventMessage, MessageType } from '../../../src/@types/messages' +import { CacheAdmissionState } from '../../../src/constants/caching' import { Event } from '../../../src/@types/event' import { EventKinds } from '../../../src/constants/base' import { EventMessageHandler } from '../../../src/handlers/event-message-handler' @@ -88,7 +89,8 @@ describe('EventMessageHandler', () => { () => ({ info: { relay_url: 'relay_url' }, }) as any, - () => ({ hit: async () => false }) + () => ({ hit: async () => false }), + { hasKey: async () => false, setKey: async () => true } as any, ) }) @@ -262,7 +264,8 @@ describe('EventMessageHandler', () => { { hasActiveRequestToVanish: async () => false } as any, userRepository, () => settings, - () => ({ hit: async () => false }) + () => ({ hit: async () => false }), + { hasKey: async () => false, setKey: async () => true } as any, ) }) @@ -738,7 +741,8 @@ describe('EventMessageHandler', () => { { hasActiveRequestToVanish: async () => false } as any, userRepository, () => settings, - () => ({ hit: rateLimiterHitStub }) + () => ({ hit: rateLimiterHitStub }), + { hasKey: async () => false, setKey: async () => true } as any, ) }) @@ -953,6 +957,7 @@ describe('EventMessageHandler', () => { let webSocket: IWebSocketAdapter let getRelayPublicKeyStub: SinonStub let userRepositoryFindByPubkeyStub: SinonStub + let cacheStub: any beforeEach(() => { settings = { @@ -1000,13 +1005,19 @@ describe('EventMessageHandler', () => { userRepository = { findByPubkey: userRepositoryFindByPubkeyStub, } as any + cacheStub = { + hasKey: sandbox.stub().resolves(false), + getKey: sandbox.stub().resolves(null), + setKey: sandbox.stub().resolves(true), + } handler = new EventMessageHandler( webSocket, () => null, { hasActiveRequestToVanish: async () => false } as any, userRepository, () => settings, - () => ({ hit: async () => false }) + () => ({ hit: async () => false }), + cacheStub, ) }) @@ -1108,5 +1119,51 @@ describe('EventMessageHandler', () => { return expect((handler as any).isUserAdmitted(event)).to.eventually.be.undefined }) + + describe('caching', () => { + it('fulfills with undefined and uses cache hit for admitted user without hitting DB', async () => { + cacheStub.getKey.resolves(CacheAdmissionState.ADMITTED) + + await expect((handler as any).isUserAdmitted(event)).to.eventually.be.undefined + expect(userRepositoryFindByPubkeyStub).not.to.have.been.called + }) + + it('fulfills with reason and uses cache hit for blocked user without hitting DB', async () => { + cacheStub.getKey.resolves(CacheAdmissionState.BLOCKED_NOT_ADMITTED) + + await expect((handler as any).isUserAdmitted(event)).to.eventually.equal('blocked: pubkey not admitted') + expect(userRepositoryFindByPubkeyStub).not.to.have.been.called + }) + + it('fulfills with reason and uses cache hit for insufficient balance without hitting DB', async () => { + cacheStub.getKey.resolves(CacheAdmissionState.BLOCKED_INSUFFICIENT_BALANCE) + + await expect((handler as any).isUserAdmitted(event)).to.eventually.equal('blocked: insufficient balance') + expect(userRepositoryFindByPubkeyStub).not.to.have.been.called + }) + + it('caches blocked status with 60s ttl when user is not found', async () => { + userRepositoryFindByPubkeyStub.resolves(undefined) + + await (handler as any).isUserAdmitted(event) + expect(cacheStub.setKey).to.have.been.calledWith(`${event.pubkey}:is-admitted`, CacheAdmissionState.BLOCKED_NOT_ADMITTED, 60) + }) + + it('caches insufficient balance status with 60s ttl when user balance is too low', async () => { + settings.limits.event.pubkey.minBalance = 100n + userRepositoryFindByPubkeyStub.resolves({ isAdmitted: true, balance: 50n }) + + await (handler as any).isUserAdmitted(event) + expect(cacheStub.setKey).to.have.been.calledWith(`${event.pubkey}:is-admitted`, CacheAdmissionState.BLOCKED_INSUFFICIENT_BALANCE, 60) + }) + + it('caches admitted status with 300s ttl when user is admitted and has balance', async () => { + settings.limits.event.pubkey.minBalance = 100n + userRepositoryFindByPubkeyStub.resolves({ isAdmitted: true, balance: 150n }) + + await (handler as any).isUserAdmitted(event) + expect(cacheStub.setKey).to.have.been.calledWith(`${event.pubkey}:is-admitted`, CacheAdmissionState.ADMITTED, 300) + }) + }) }) })