Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
"@types/express": "4.17.15",
"@types/js-yaml": "4.0.5",
"@types/mocha": "^9.1.1",
"@types/node": "^24.0.0",
"@types/node": "^24.12.2",
"@types/pg": "^8.6.5",
"@types/ramda": "^0.28.13",
"@types/sinon": "^10.0.11",
Expand Down
2 changes: 1 addition & 1 deletion src/@types/adapters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export type IWebSocketAdapter = EventEmitter & {
export interface ICacheAdapter {
getKey(key: string): Promise<string>
hasKey(key: string): Promise<boolean>
setKey(key: string, value: string): Promise<boolean>
setKey(key: string, value: string, expirySeconds?: number): Promise<boolean>
addToSortedSet(key: string, set: Record<string, string> | Record<string, string>[]): Promise<number>
removeRangeByScoreFromSortedSet(key: string, min: number, max: number): Promise<number>
getRangeFromSortedSet(key: string, start: number, stop: number): Promise<string[]>
Expand Down
9 changes: 6 additions & 3 deletions src/adapters/redis-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class RedisAdapter implements ICacheAdapter {
}

private onClientError(error: Error) {
console.error('Unable to connect to Redis.', error)
debug('Unable to connect to Redis.', error)
// throw error
}

Expand All @@ -58,9 +58,12 @@ export class RedisAdapter implements ICacheAdapter {
return this.client.get(key)
}

public async setKey(key: string, value: string): Promise<boolean> {
public async setKey(key: string, value: string, expirySeconds?: number): Promise<boolean> {
await this.connection
debug('get %s key', key)
debug('set %s key', key)
if (typeof expirySeconds === 'number') {
return 'OK' === await this.client.set(key, value, { EX: expirySeconds })
}
return 'OK' === await this.client.set(key, value)
}

Expand Down
5 changes: 5 additions & 0 deletions src/constants/caching.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export enum CacheAdmissionState {
ADMITTED = 'admitted',
BLOCKED_NOT_ADMITTED = 'blocked_not_admitted',
BLOCKED_INSUFFICIENT_BALANCE = 'blocked_insufficient_balance',
}
13 changes: 12 additions & 1 deletion src/factories/message-handler-factory.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
import { ICacheAdapter, IWebSocketAdapter } from '../@types/adapters'
import { IEventRepository, IUserRepository } from '../@types/repositories'
import { IncomingMessage, MessageType } from '../@types/messages'
import { createSettings } from './settings-factory'
import { EventMessageHandler } from '../handlers/event-message-handler'
import { eventStrategyFactory } from './event-strategy-factory'
import { IWebSocketAdapter } from '../@types/adapters'
import { getCacheClient } from '../cache/client'
import { RedisAdapter } from '../adapters/redis-adapter'
import { slidingWindowRateLimiterFactory } from './rate-limiter-factory'
import { SubscribeMessageHandler } from '../handlers/subscribe-message-handler'
import { UnsubscribeMessageHandler } from '../handlers/unsubscribe-message-handler'

let cacheAdapter: ICacheAdapter | undefined = undefined
const getCache = (): ICacheAdapter => {
if (!cacheAdapter) {
cacheAdapter = new RedisAdapter(getCacheClient())
}
return cacheAdapter
}

export const messageHandlerFactory = (
eventRepository: IEventRepository,
userRepository: IUserRepository,
Expand All @@ -22,6 +32,7 @@ export const messageHandlerFactory = (
userRepository,
createSettings,
slidingWindowRateLimiterFactory,
getCache(),
)
}
case MessageType.REQ:
Expand Down
35 changes: 32 additions & 3 deletions src/handlers/event-message-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import {
} from '../utils/event'
import { IEventRepository, IUserRepository } from '../@types/repositories'
import { IEventStrategy, IMessageHandler } from '../@types/message-handlers'
import { CacheAdmissionState } from '../constants/caching'
import { createCommandResult } from '../utils/messages'
import { createLogger } from '../factories/logger-factory'
import { Factory } from '../@types/base'
import { ICacheAdapter } from '../@types/adapters'
import { IncomingEventMessage } from '../@types/messages'
import { IRateLimiter } from '../@types/utils'
import { IWebSocketAdapter } from '../@types/adapters'
Expand All @@ -33,6 +35,7 @@ export class EventMessageHandler implements IMessageHandler {
protected readonly userRepository: IUserRepository,
private readonly settings: () => Settings,
private readonly slidingWindowRateLimiter: Factory<IRateLimiter>,
private readonly cache: ICacheAdapter,
) {}

public async handleMessage(message: IncomingEventMessage): Promise<void> {
Expand Down Expand Up @@ -92,7 +95,6 @@ export class EventMessageHandler implements IMessageHandler {
try {
await strategy.execute(event)
} catch (error) {
console.error('error handling message', message, error)
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: unable to process event'))
}
}
Expand Down Expand Up @@ -313,17 +315,44 @@ export class EventMessageHandler implements IMessageHandler {
return
}

// const hasKey = await this.cache.hasKey(`${event.pubkey}:is-admitted`)
// TODO: use cache
const cacheKey = `${event.pubkey}:is-admitted`

try {
const cachedValue = await this.cache.getKey(cacheKey)
if (cachedValue === CacheAdmissionState.ADMITTED) {
debug('cache hit for %s admission: admitted', event.pubkey)
return
}
if (cachedValue === CacheAdmissionState.BLOCKED_NOT_ADMITTED) {
debug('cache hit for %s admission: blocked', event.pubkey)
return 'blocked: pubkey not admitted'
}
if (cachedValue === CacheAdmissionState.BLOCKED_INSUFFICIENT_BALANCE) {
debug('cache hit for %s admission: insufficient balance', event.pubkey)
return 'blocked: insufficient balance'
}
} catch (error) {
debug('cache error for %s: %o', event.pubkey, error)
}

const user = await this.userRepository.findByPubkey(event.pubkey)
if (!user || !user.isAdmitted) {
this.cacheSet(cacheKey, CacheAdmissionState.BLOCKED_NOT_ADMITTED, 60)
return 'blocked: pubkey not admitted'
}

const minBalance = currentSettings.limits?.event?.pubkey?.minBalance ?? 0n
if (minBalance > 0n && user.balance < minBalance) {
this.cacheSet(cacheKey, CacheAdmissionState.BLOCKED_INSUFFICIENT_BALANCE, 60)
return 'blocked: insufficient balance'
}

this.cacheSet(cacheKey, CacheAdmissionState.ADMITTED, 300)
}

private cacheSet(key: string, value: string, ttl: number): void {
this.cache.setKey(key, value, ttl)
.catch((error) => debug('unable to cache %s: %o', key, error))
}

protected addExpirationMetadata(event: Event): Event | ExpiringEvent {
Expand Down
65 changes: 61 additions & 4 deletions test/unit/handlers/event-message-handler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ chai.use(chaiAsPromised)

import { EventLimits, Settings } from '../../../src/@types/settings'
import { IncomingEventMessage, MessageType } from '../../../src/@types/messages'
import { CacheAdmissionState } from '../../../src/constants/caching'
import { Event } from '../../../src/@types/event'
import { EventKinds } from '../../../src/constants/base'
import { EventMessageHandler } from '../../../src/handlers/event-message-handler'
Expand Down Expand Up @@ -88,7 +89,8 @@ describe('EventMessageHandler', () => {
() => ({
info: { relay_url: 'relay_url' },
}) as any,
() => ({ hit: async () => false })
() => ({ hit: async () => false }),
{ hasKey: async () => false, setKey: async () => true } as any,
)
})

Expand Down Expand Up @@ -262,7 +264,8 @@ describe('EventMessageHandler', () => {
{ hasActiveRequestToVanish: async () => false } as any,
userRepository,
() => settings,
() => ({ hit: async () => false })
() => ({ hit: async () => false }),
{ hasKey: async () => false, setKey: async () => true } as any,
)
})

Expand Down Expand Up @@ -738,7 +741,8 @@ describe('EventMessageHandler', () => {
{ hasActiveRequestToVanish: async () => false } as any,
userRepository,
() => settings,
() => ({ hit: rateLimiterHitStub })
() => ({ hit: rateLimiterHitStub }),
{ hasKey: async () => false, setKey: async () => true } as any,
)
})

Expand Down Expand Up @@ -953,6 +957,7 @@ describe('EventMessageHandler', () => {
let webSocket: IWebSocketAdapter
let getRelayPublicKeyStub: SinonStub
let userRepositoryFindByPubkeyStub: SinonStub
let cacheStub: any

beforeEach(() => {
settings = {
Expand Down Expand Up @@ -1000,13 +1005,19 @@ describe('EventMessageHandler', () => {
userRepository = {
findByPubkey: userRepositoryFindByPubkeyStub,
} as any
cacheStub = {
hasKey: sandbox.stub().resolves(false),
getKey: sandbox.stub().resolves(null),
setKey: sandbox.stub().resolves(true),
}
handler = new EventMessageHandler(
webSocket,
() => null,
{ hasActiveRequestToVanish: async () => false } as any,
userRepository,
() => settings,
() => ({ hit: async () => false })
() => ({ hit: async () => false }),
cacheStub,
)
})

Expand Down Expand Up @@ -1108,5 +1119,51 @@ describe('EventMessageHandler', () => {

return expect((handler as any).isUserAdmitted(event)).to.eventually.be.undefined
})

describe('caching', () => {
it('fulfills with undefined and uses cache hit for admitted user without hitting DB', async () => {
cacheStub.getKey.resolves(CacheAdmissionState.ADMITTED)

await expect((handler as any).isUserAdmitted(event)).to.eventually.be.undefined
expect(userRepositoryFindByPubkeyStub).not.to.have.been.called
})

it('fulfills with reason and uses cache hit for blocked user without hitting DB', async () => {
cacheStub.getKey.resolves(CacheAdmissionState.BLOCKED_NOT_ADMITTED)

await expect((handler as any).isUserAdmitted(event)).to.eventually.equal('blocked: pubkey not admitted')
expect(userRepositoryFindByPubkeyStub).not.to.have.been.called
})

it('fulfills with reason and uses cache hit for insufficient balance without hitting DB', async () => {
cacheStub.getKey.resolves(CacheAdmissionState.BLOCKED_INSUFFICIENT_BALANCE)

await expect((handler as any).isUserAdmitted(event)).to.eventually.equal('blocked: insufficient balance')
expect(userRepositoryFindByPubkeyStub).not.to.have.been.called
})

it('caches blocked status with 60s ttl when user is not found', async () => {
userRepositoryFindByPubkeyStub.resolves(undefined)

await (handler as any).isUserAdmitted(event)
expect(cacheStub.setKey).to.have.been.calledWith(`${event.pubkey}:is-admitted`, CacheAdmissionState.BLOCKED_NOT_ADMITTED, 60)
})

it('caches insufficient balance status with 60s ttl when user balance is too low', async () => {
settings.limits.event.pubkey.minBalance = 100n
userRepositoryFindByPubkeyStub.resolves({ isAdmitted: true, balance: 50n })

await (handler as any).isUserAdmitted(event)
expect(cacheStub.setKey).to.have.been.calledWith(`${event.pubkey}:is-admitted`, CacheAdmissionState.BLOCKED_INSUFFICIENT_BALANCE, 60)
})

it('caches admitted status with 300s ttl when user is admitted and has balance', async () => {
settings.limits.event.pubkey.minBalance = 100n
userRepositoryFindByPubkeyStub.resolves({ isAdmitted: true, balance: 150n })

await (handler as any).isUserAdmitted(event)
expect(cacheStub.setKey).to.have.been.calledWith(`${event.pubkey}:is-admitted`, CacheAdmissionState.ADMITTED, 300)
})
})
})
})