diff --git a/workers/limiter/src/dbHelper.ts b/workers/limiter/src/dbHelper.ts index dbe45da0..b4cc845f 100644 --- a/workers/limiter/src/dbHelper.ts +++ b/workers/limiter/src/dbHelper.ts @@ -1,9 +1,24 @@ import { Collection, Db, ObjectId } from 'mongodb'; -import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; +import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; import { WorkspaceWithTariffPlan } from '../types'; import HawkCatcher from '@hawk.so/nodejs'; import { CriticalError, NonCriticalError } from '../../../lib/workerErrors'; +const WORKSPACE_PROJECTION = { + _id: 1, + name: 1, + isBlocked: 1, + blockedDate: 1, + lastChargeDate: 1, + billingPeriodEventsCount: 1, + tariffPlanId: 1, +} as const; + +type WorkspaceForLimiter = Pick< + WorkspaceDBScheme, + '_id' | 'name' | 'isBlocked' | 'blockedDate' | 'lastChargeDate' | 'billingPeriodEventsCount' | 'tariffPlanId' +>; + /** * Class that implements methods used for interaction between limiter and db */ @@ -23,15 +38,49 @@ export class DbHelper { */ private workspacesCollection: Collection; + /** + * Collection with tariff plans + */ + private plansCollection: Collection; + + /** + * In-memory cache of tariff plans — avoids $lookup on the small plans collection per workspace + */ + private plans: PlanDBScheme[] = []; + + /** + * Plan ids that were still missing after a cache refresh — don't trigger more refreshes for them + */ + private knownMissingPlanIds: Set = new Set(); + /** * @param projects - projects collection * @param workspaces - workspaces collection + * @param plans - plans collection * @param eventsDbConnection - connection to events DB */ - constructor(projects: Collection, workspaces: Collection, eventsDbConnection: Db) { + constructor( + projects: Collection, + workspaces: Collection, + plans: Collection, + eventsDbConnection: Db + ) { this.eventsDbConnection = eventsDbConnection; this.projectsCollection = projects; this.workspacesCollection = workspaces; + this.plansCollection = plans; + } + + /** + * Fetches tariff plans from database and keeps them cached + */ + public async fetchPlans(): Promise { + this.plans = await this.plansCollection.find({}).toArray(); + this.knownMissingPlanIds.clear(); + + if (this.plans.length === 0) { + throw new CriticalError('Please add tariff plans to the database'); + } } /** @@ -148,22 +197,51 @@ export class DbHelper { return this.projectsCollection.find(query).toArray(); } + /** + * Returns plan from cache, refetches once on miss + * + * @param planId - id of the plan to find + */ + private async resolvePlan(planId: WorkspaceDBScheme['tariffPlanId']): Promise { + let plan = this.findPlanById(planId); + + if (plan) { + return plan; + } + + const planIdStr = planId.toString(); + + if (this.knownMissingPlanIds.has(planIdStr)) { + return null; + } + + await this.fetchPlans(); + plan = this.findPlanById(planId); + + if (!plan) { + this.knownMissingPlanIds.add(planIdStr); + } + + return plan ?? null; + } + + /** + * @param planId - id of the plan to find + */ + private findPlanById(planId: WorkspaceDBScheme['tariffPlanId']): PlanDBScheme | undefined { + return this.plans.find((plan) => plan._id.toString() === planId.toString()); + } + /** * Returns a single workspace with its tariff plan by id * * @param id - workspace id */ private async getOneWorkspaceWithTariffPlan(id: string): Promise { - const pipeline = [ - { - $match: { - _id: new ObjectId(id), - }, - }, - ...this.tariffPlanLookupPipeline(), - ]; - - const workspace = await this.workspacesCollection.aggregate(pipeline).next(); + const workspace = await this.workspacesCollection + .find({ _id: new ObjectId(id) }) + .project(WORKSPACE_PROJECTION) + .next(); if (workspace === null) { throw new NonCriticalError(`Workspace ${id} not found`, { @@ -171,48 +249,46 @@ export class DbHelper { }); } - return workspace; + const plan = await this.resolvePlan(workspace.tariffPlanId); + + if (!plan) { + throw new NonCriticalError(`Tariff plan ${workspace.tariffPlanId.toString()} not found for workspace ${id}`, { + workspaceId: id, + }); + } + + return { + ...workspace, + tariffPlan: plan, + }; } /** * Yields all workspaces with their tariff plans one by one */ private async * yieldWorkspacesWithTariffPlans(): AsyncGenerator { - const pipeline = this.tariffPlanLookupPipeline(); - const cursor = this.workspacesCollection.aggregate(pipeline); + const cursor = this.workspacesCollection + .find({}) + .project(WORKSPACE_PROJECTION); for await (const workspace of cursor) { - yield workspace; - } - } + const plan = await this.resolvePlan(workspace.tariffPlanId); - /* eslint-disable-next-line */ - private tariffPlanLookupPipeline(): any[] { - return [ - { - $lookup: { - from: 'plans', - localField: 'tariffPlanId', - foreignField: '_id', - as: 'tariffPlan', - }, - }, - { - $unwind: { - path: '$tariffPlan', - }, - }, - { - $project: { - _id: 1, - name: 1, - isBlocked: 1, - blockedDate: 1, - lastChargeDate: 1, - billingPeriodEventsCount: 1, - tariffPlan: 1, - }, - }, - ]; + if (!plan) { + HawkCatcher.send( + new Error(`[Limiter] Tariff plan not found for workspace`), + { + workspaceId: workspace._id.toString(), + tariffPlanId: workspace.tariffPlanId?.toString(), + } + ); + continue; + } + + yield { + ...workspace, + tariffPlan: plan, + }; + } } -} \ No newline at end of file +} diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index 82ed2539..6ed21cfe 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -3,7 +3,7 @@ import { Worker } from '../../../lib/worker'; import * as pkg from '../package.json'; import * as path from 'path'; import * as dotenv from 'dotenv'; -import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; +import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types'; import HawkCatcher from '@hawk.so/nodejs'; import { MS_IN_SEC } from '../../../lib/utils/consts'; import LimiterEvent, { BlockWorkspaceEvent, UnblockWorkspaceEvent } from '../types/eventTypes'; @@ -68,8 +68,11 @@ export default class LimiterWorker extends Worker { const projectsCollection = accountDbConnection.collection('projects'); const workspacesCollection = accountDbConnection.collection('workspaces'); + const plansCollection = accountDbConnection.collection('plans'); - this.dbHelper = new DbHelper(projectsCollection, workspacesCollection, eventsDbConnection); + this.dbHelper = new DbHelper(projectsCollection, workspacesCollection, plansCollection, eventsDbConnection); + + await this.dbHelper.fetchPlans(); await this.redis.initialize(); diff --git a/workers/limiter/tests/dbHelper.test.ts b/workers/limiter/tests/dbHelper.test.ts index 947b070a..3e8eba54 100644 --- a/workers/limiter/tests/dbHelper.test.ts +++ b/workers/limiter/tests/dbHelper.test.ts @@ -4,6 +4,8 @@ import { GroupedEventDBScheme, PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme import { DbHelper } from '../src/dbHelper'; import { mockedPlans } from './plans.mock'; import { MS_IN_SEC } from '../../../lib/utils/consts'; +import { CriticalError, NonCriticalError } from '../../../lib/workerErrors'; +import HawkCatcher from '@hawk.so/nodejs'; /** * Constant of last charge date in all workspaces for tests @@ -130,7 +132,8 @@ describe('DbHelper', () => { await planCollection.deleteMany({}); await planCollection.insertMany(Object.values(mockedPlans)); - dbHelper = new DbHelper(projectCollection, workspaceCollection, db); + dbHelper = new DbHelper(projectCollection, workspaceCollection, planCollection, db); + await dbHelper.fetchPlans(); }, 30000); // 30 seconds timeout for MongoDB connection and setup beforeEach(async () => { @@ -203,6 +206,180 @@ describe('DbHelper', () => { }); }); + describe('plans caching', () => { + /** + * Restore the default plans cache after tests that mutate the plans collection or the cache + */ + afterEach(async () => { + jest.restoreAllMocks(); + await planCollection.deleteMany({}); + await planCollection.insertMany(Object.values(mockedPlans)); + await dbHelper.fetchPlans(); + }); + + test('Should serve plans from the in-memory cache without reading the plans collection per workspace', async () => { + /** + * Arrange + */ + const workspace1 = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10, + billingPeriodEventsCount: 0, + lastChargeDate: new Date(), + }); + const workspace2 = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10000, + billingPeriodEventsCount: 0, + lastChargeDate: new Date(), + }); + + await workspaceCollection.insertMany([workspace1, workspace2]); + + /** + * Cache is already loaded in beforeAll — start watching the plans collection from now on + */ + const findSpy = jest.spyOn(planCollection, 'find'); + + /** + * Act + */ + const workspaces = []; + + for await (const workspace of dbHelper.getWorkspacesWithTariffPlans()) { + workspaces.push(workspace); + } + + /** + * Assert — both plans are resolved from the cache, the plans collection is never read + */ + expect(workspaces).toHaveLength(2); + expect(workspaces[0].tariffPlan.eventsLimit).toBe(10); + expect(workspaces[1].tariffPlan.eventsLimit).toBe(10000); + expect(findSpy).not.toHaveBeenCalled(); + }); + + test('Should refetch plans once on a cache miss and resolve a plan added after startup', async () => { + /** + * Arrange — a plan that exists in the database but not in the already-loaded cache + */ + const freshPlan: PlanDBScheme = { + _id: new ObjectId(), + name: 'Fresh plan', + monthlyCharge: 10, + monthlyChargeCurrency: 'RUB', + eventsLimit: 500, + isDefault: false, + }; + + await planCollection.insertOne(freshPlan); + + const workspace = createWorkspaceMock({ + plan: freshPlan, + billingPeriodEventsCount: 0, + lastChargeDate: new Date(), + }); + + await workspaceCollection.insertOne(workspace); + + const findSpy = jest.spyOn(planCollection, 'find'); + + /** + * Act + */ + const result = await dbHelper.getWorkspacesWithTariffPlans(workspace._id.toString()); + + /** + * Assert — the miss triggered exactly one refetch and the plan resolved from the refreshed cache + */ + expect(result.tariffPlan.eventsLimit).toBe(500); + expect(findSpy).toHaveBeenCalledTimes(1); + }); + + test('Should reload the plans collection only once for repeated misses of the same plan id', async () => { + /** + * Arrange — two workspaces referencing the same non-existent plan + */ + const missingPlanId = new ObjectId(); + + const workspace1 = createWorkspaceMock({ + plan: { + ...mockedPlans.eventsLimit10, + _id: missingPlanId, + }, + billingPeriodEventsCount: 0, + lastChargeDate: new Date(), + }); + const workspace2 = createWorkspaceMock({ + plan: { + ...mockedPlans.eventsLimit10, + _id: missingPlanId, + }, + billingPeriodEventsCount: 0, + lastChargeDate: new Date(), + }); + + await workspaceCollection.insertMany([workspace1, workspace2]); + + const findSpy = jest.spyOn(planCollection, 'find'); + const hawkCatcherSpy = jest.spyOn(HawkCatcher, 'send').mockImplementation(() => undefined); + + /** + * Act + */ + const workspaces = []; + + for await (const workspace of dbHelper.getWorkspacesWithTariffPlans()) { + workspaces.push(workspace); + } + + /** + * Assert — workspaces with a dangling plan are skipped and reported, and the missing id is + * memoized so the collection is reloaded only once despite two misses + */ + expect(workspaces).toHaveLength(0); + expect(findSpy).toHaveBeenCalledTimes(1); + expect(hawkCatcherSpy).toHaveBeenCalledTimes(2); + }); + + test('Should throw NonCriticalError when a single workspace references a non-existent plan', async () => { + /** + * Arrange + */ + const workspace = createWorkspaceMock({ + plan: { + ...mockedPlans.eventsLimit10, + _id: new ObjectId(), + }, + billingPeriodEventsCount: 0, + lastChargeDate: new Date(), + }); + + await workspaceCollection.insertOne(workspace); + + /** + * Act & Assert + */ + await expect( + dbHelper.getWorkspacesWithTariffPlans(workspace._id.toString()) + ).rejects.toThrow(NonCriticalError); + }); + + test('fetchPlans should throw CriticalError when the plans collection is empty', async () => { + /** + * Arrange — a helper pointed at an empty plans collection + */ + const emptyPlanCollection = db.collection('plans_empty_for_test'); + + await emptyPlanCollection.deleteMany({}); + + const helperWithoutPlans = new DbHelper(projectCollection, workspaceCollection, emptyPlanCollection, db); + + /** + * Act & Assert + */ + await expect(helperWithoutPlans.fetchPlans()).rejects.toThrow(CriticalError); + }); + }); + describe('updateWorkspacesEventsCountAndIsBlocked', () => { test('Should update multiple workspaces', async () => { /**