Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ Running `nostream` for the first time creates the settings file in `<project_roo
| limits.event.rateLimits[].rate | Maximum number of events during period. |
| limits.event.whitelists.pubkeys | List of public keys to ignore rate limits. |
| limits.event.whitelists.ipAddresses | List of IPs (IPv4 or IPv6) to ignore rate limits. |
| limits.event.retention.maxDays | Maximum number of days to retain events. Purge deletes events that are expired (`expires_at`), soft-deleted (`deleted_at`), or older than this window (`created_at`). Any non-positive value disables retention purge. |
| limits.event.retention.kind.whitelist | Event kinds excluded from retention purge. NIP-62 `REQUEST_TO_VANISH` is always excluded from retention purge, even if not listed here. |
| limits.event.retention.pubkey.whitelist | Public keys excluded from retention purge. |
| limits.client.subscription.maxSubscriptions | Maximum number of subscriptions per connected client. Defaults to 10. Disabled when set to zero. |
| limits.client.subscription.maxFilters | Maximum number of filters per subscription. Defaults to 10. Disabled when set to zero. |
| limits.message.rateLimits[].period | Rate limit period in milliseconds. |
Expand Down
7 changes: 7 additions & 0 deletions resources/default-settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ limits:
- "10.10.10.1"
- "::ffff:10.10.10.1"
event:
retention:
maxDays: -1
kind:
whitelist:
- 62
pubkey:
whitelist: []
eventId:
minLeadingZeroBits: 0
kind:
Expand Down
15 changes: 15 additions & 0 deletions src/@types/repositories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,24 @@ import { PassThrough } from 'stream'

import { DatabaseClient, EventId, Pubkey } from './base'
import { DBEvent, Event } from './event'
import { EventKinds } from '../constants/base'
import { EventKindsRange } from './settings'
import { Invoice } from './invoice'
import { SubscriptionFilter } from './subscription'
import { User } from './user'

export interface EventRetentionOptions {
maxDays?: number
kindWhitelist?: (EventKinds | EventKindsRange)[]
pubkeyWhitelist?: Pubkey[]
}

export interface EventPurgeCounts {
deleted: number
expired: number
retained: number
}

export type ExposedPromiseKeys = 'then' | 'catch' | 'finally'

export interface IQueryResult<T> extends Pick<Promise<T>, keyof Promise<T> & ExposedPromiseKeys> {
Expand All @@ -21,6 +35,7 @@ export interface IEventRepository {
deleteByPubkeyAndIds(pubkey: Pubkey, ids: EventId[]): Promise<number>
deleteByPubkeyExceptKinds(pubkey: Pubkey, excludedKinds: number[]): Promise<number>
hasActiveRequestToVanish(pubkey: Pubkey): Promise<boolean>
deleteExpiredAndRetained(options?: EventRetentionOptions): Promise<EventPurgeCounts>
}

export interface IInvoiceRepository {
Expand Down
4 changes: 4 additions & 0 deletions src/@types/services.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { Invoice } from './invoice'
import { Pubkey } from './base'

export interface IMaintenanceService {
clearOldEvents(): Promise<void>
}

export interface IPaymentsService {
getInvoiceFromPaymentsProcessor(invoice: string | Invoice): Promise<Partial<Invoice>>
createInvoice(
Expand Down
15 changes: 15 additions & 0 deletions src/@types/settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ export interface EventWhitelists {
ipAddresses?: string[]
}

export interface EventRetentionKindLimits {
whitelist?: (EventKinds | EventKindsRange)[]
}

export interface EventRetentionPubkeyLimits {
whitelist?: Pubkey[]
}

export interface EventRetentionLimits {
maxDays?: number
kind?: EventRetentionKindLimits
pubkey?: EventRetentionPubkeyLimits
}

export interface EventLimits {
eventId?: EventIdLimits
pubkey?: PubkeyLimits
Expand All @@ -77,6 +91,7 @@ export interface EventLimits {
content?: ContentLimits | ContentLimits[]
rateLimits?: EventRateLimit[]
whitelists?: EventWhitelists
retention?: EventRetentionLimits
}

export interface ClientSubscriptionLimits {
Expand Down
38 changes: 36 additions & 2 deletions src/app/maintenance-worker.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
import { IMaintenanceService, IPaymentsService } from '../@types/services'
import { mergeDeepLeft, path, pipe } from 'ramda'
import { IRunnable } from '../@types/base'

import { createLogger } from '../factories/logger-factory'
import { delayMs } from '../utils/misc'
import { InvoiceStatus } from '../@types/invoice'
import { IPaymentsService } from '../@types/services'
import { Settings } from '../@types/settings'

const UPDATE_INVOICE_INTERVAL = 60000
const CLEAR_OLD_EVENTS_TIMEOUT_MS = 5000

const debug = createLogger('maintenance-worker')

export class MaintenanceWorker implements IRunnable {
private interval: NodeJS.Timeout | undefined
private isRunning = false

public constructor(
private readonly process: NodeJS.Process,
private readonly paymentsService: IPaymentsService,
private readonly maintenanceService: IMaintenanceService,
private readonly settings: () => Settings,
) {
this.process
Expand All @@ -27,14 +30,43 @@ export class MaintenanceWorker implements IRunnable {
.on('unhandledRejection', this.onError.bind(this))
}

private async clearOldEventsSafely(): Promise<void> {
try {
await Promise.race([
this.maintenanceService.clearOldEvents(),
delayMs(CLEAR_OLD_EVENTS_TIMEOUT_MS).then(() => {
throw new Error(`clearOldEvents timed out after ${CLEAR_OLD_EVENTS_TIMEOUT_MS}ms`)
}),
])
} catch (error) {
debug('unable to clear old events: %o', error)
}
}

public run(): void {
this.interval = setInterval(() => this.onSchedule(), UPDATE_INVOICE_INTERVAL)
this.interval = setInterval(async () => {
if (this.isRunning) {
debug('skipping scheduled maintenance run because previous run is still in progress')
return
}

this.isRunning = true
try {
await this.onSchedule()
} catch (error) {
this.onError(error as Error)
} finally {
this.isRunning = false
}
}, UPDATE_INVOICE_INTERVAL)
}

private async onSchedule(): Promise<void> {
const currentSettings = this.settings()
const clearOldEventsPromise = this.clearOldEventsSafely()

if (!path(['payments','enabled'], currentSettings)) {
await clearOldEventsPromise
return
Comment on lines 64 to 70
}

Expand Down Expand Up @@ -84,6 +116,8 @@ export class MaintenanceWorker implements IRunnable {

debug('updated %d of %d invoices successfully', successful, invoices.length)
}

await clearOldEventsPromise
}

private onError(error: Error) {
Expand Down
11 changes: 11 additions & 0 deletions src/factories/maintenance-service-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { getMasterDbClient, getReadReplicaDbClient } from '../database/client'
import { createSettings } from './settings-factory'
import { EventRepository } from '../repositories/event-repository'
import { MaintenanceService } from '../services/maintenance-service'

export const createMaintenanceService = () => {
return new MaintenanceService(
new EventRepository(getMasterDbClient(), getReadReplicaDbClient()),
createSettings
)
}
8 changes: 7 additions & 1 deletion src/factories/maintenance-worker-factory.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { createMaintenanceService } from './maintenance-service-factory'
import { createPaymentsService } from './payments-service-factory'
import { createSettings } from './settings-factory'
import { MaintenanceWorker } from '../app/maintenance-worker'

export const maintenanceWorkerFactory = () => {
return new MaintenanceWorker(process, createPaymentsService(), createSettings)
return new MaintenanceWorker(
process,
createPaymentsService(),
createMaintenanceService(),
createSettings
)
}
97 changes: 96 additions & 1 deletion src/repositories/event-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import {
import { ContextMetadataKey, EventDeduplicationMetadataKey, EventExpirationTimeMetadataKey, EventKinds } from '../constants/base'
import { DatabaseClient, EventId } from '../@types/base'
import { DBEvent, Event } from '../@types/event'
import { IEventRepository, IQueryResult } from '../@types/repositories'
import { EventPurgeCounts, EventRetentionOptions, IEventRepository, IQueryResult } from '../@types/repositories'
import { toBuffer, toJSON } from '../utils/transform'
import { createLogger } from '../factories/logger-factory'
import { isGenericTagQuery } from '../utils/filter'
Expand Down Expand Up @@ -322,4 +322,99 @@ export class EventRepository implements IEventRepository {

return Boolean(result)
}

public deleteExpiredAndRetained(options?: EventRetentionOptions): Promise<EventPurgeCounts> {
const now = Math.floor(Date.now() / 1000)
const maxDays = options?.maxDays

if (typeof maxDays !== 'number' || isNaN(maxDays) || maxDays <= 0) {
debug('skipping purge: retention.maxDays is not a positive number')
return Promise.resolve({
deleted: 0,
expired: 0,
retained: 0,
})
}

const retentionLimit = now - (maxDays * 86400)
const batchSize = 1000

debug('deleting expired and retained events (retentionLimit: %d, now: %d, batchSize: %d)', retentionLimit, now, batchSize)

const kindWhitelist = [
...(Array.isArray(options?.kindWhitelist) ? options.kindWhitelist : []),
EventKinds.REQUEST_TO_VANISH,
].reduce<(number | [number, number])[]>((result, item) => {
const key = Array.isArray(item)
? `range:${item[0]}-${item[1]}`
: `kind:${item}`

if (!result.some((existing) => {
const existingKey = Array.isArray(existing)
? `range:${existing[0]}-${existing[1]}`
: `kind:${existing}`
return existingKey === key
})) {
result.push(item)
}

return result
}, [])

const candidates = this.masterDbClient('events')
.select('event_id')
.where(function () {
this.where('expires_at', '<', now)
.orWhereNotNull('deleted_at')
.orWhere('event_created_at', '<', retentionLimit)
})
.modify((query) => {
query.whereNot((builder) => {
kindWhitelist.forEach((kindOrRange) => {
if (Array.isArray(kindOrRange)) {
builder.orWhereBetween('event_kind', kindOrRange)
} else {
builder.orWhere('event_kind', kindOrRange)
}
})
})

if (Array.isArray(options?.pubkeyWhitelist) && options.pubkeyWhitelist.length > 0) {
query.whereNotIn('event_pubkey', map(toBuffer)(options.pubkeyWhitelist))
}
})
.limit(batchSize)

const query = this.masterDbClient('events')
.whereIn('event_id', candidates)
.del(['deleted_at', 'expires_at', 'event_created_at'])

const mapToCounts = (deletedRows: Pick<DBEvent, 'deleted_at' | 'expires_at' | 'event_created_at'>[]): EventPurgeCounts => deletedRows.reduce((counts, row) => {
if (row.deleted_at) {
counts.deleted += 1
} else if (typeof row.expires_at === 'number' && row.expires_at < now) {
counts.expired += 1
} else if (row.event_created_at < retentionLimit) {
counts.retained += 1
}

return counts
}, {
deleted: 0,
expired: 0,
retained: 0,
})

const getPromise = () => query.then((rows: any) => mapToCounts(rows))

return {
then: <T1, T2>(
onfulfilled?: ((value: EventPurgeCounts) => T1 | PromiseLike<T1>) | null,
onrejected?: ((reason: any) => T2 | PromiseLike<T2>) | null,
) => getPromise().then(onfulfilled as any, onrejected as any),
catch: <T>(onrejected?: ((reason: any) => T | PromiseLike<T>) | null) => getPromise().catch(onrejected as any),
finally: (onfinally?: (() => void) | null) => getPromise().finally(onfinally as any),
toString: (): string => query.toString(),
} as Promise<EventPurgeCounts> & { toString(): string }
}
}
38 changes: 38 additions & 0 deletions src/services/maintenance-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { createLogger } from '../factories/logger-factory'
import { IEventRepository } from '../@types/repositories'
import { IMaintenanceService } from '../@types/services'
import { Settings } from '../@types/settings'

const debug = createLogger('maintenance-service')

export class MaintenanceService implements IMaintenanceService {
public constructor(
private readonly eventRepository: IEventRepository,
private readonly settings: () => Settings,
) {}

public async clearOldEvents(): Promise<void> {
const currentSettings = this.settings()
const retention = currentSettings.limits?.event?.retention
const maxDays = retention?.maxDays

if (typeof maxDays !== 'number' || isNaN(maxDays) || maxDays <= 0) {
return
}

try {
debug('purging deleted, expired and old events')
const deletedCounts = await this.eventRepository.deleteExpiredAndRetained({
maxDays,
kindWhitelist: retention?.kind?.whitelist,
pubkeyWhitelist: retention?.pubkey?.whitelist,
})
const totalDeleted = deletedCounts.deleted + deletedCounts.expired + deletedCounts.retained
if (totalDeleted > 0) {
console.info(`[Maintenance] Deleted events: deleted=${deletedCounts.deleted}, expired=${deletedCounts.expired}, retained=${deletedCounts.retained}.`)
}
} catch (error) {
console.error('Unable to purge events. Reason:', error)
}
}
}
Loading