Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 123 additions & 47 deletions workers/limiter/src/dbHelper.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
import { Collection, Db, ObjectId } from 'mongodb';
import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
import { WorkspaceWithTariffPlan } from '../types';
import HawkCatcher from '@hawk.so/nodejs';
import { CriticalError, NonCriticalError } from '../../../lib/workerErrors';

const WORKSPACE_PROJECTION = {
_id: 1,
name: 1,
isBlocked: 1,
blockedDate: 1,
lastChargeDate: 1,
billingPeriodEventsCount: 1,
tariffPlanId: 1,
} as const;

type WorkspaceForLimiter = Pick<
WorkspaceDBScheme,
'_id' | 'name' | 'isBlocked' | 'blockedDate' | 'lastChargeDate' | 'billingPeriodEventsCount' | 'tariffPlanId'
>;

/**
* Class that implements methods used for interaction between limiter and db
*/
Expand All @@ -23,15 +38,49 @@ export class DbHelper {
*/
private workspacesCollection: Collection<WorkspaceDBScheme>;

/**
* Collection with tariff plans
*/
private plansCollection: Collection<PlanDBScheme>;

/**
* In-memory cache of tariff plans — avoids $lookup on the small plans collection per workspace
*/
private plans: PlanDBScheme[] = [];

/**
* Plan ids that were still missing after a cache refresh — don't trigger more refreshes for them
*/
private knownMissingPlanIds: Set<string> = new Set();

/**
* @param projects - projects collection
* @param workspaces - workspaces collection
* @param plans - plans collection
* @param eventsDbConnection - connection to events DB
*/
constructor(projects: Collection<ProjectDBScheme>, workspaces: Collection<WorkspaceDBScheme>, eventsDbConnection: Db) {
constructor(
projects: Collection<ProjectDBScheme>,
workspaces: Collection<WorkspaceDBScheme>,
plans: Collection<PlanDBScheme>,
eventsDbConnection: Db
) {
this.eventsDbConnection = eventsDbConnection;
this.projectsCollection = projects;
this.workspacesCollection = workspaces;
this.plansCollection = plans;
}

/**
* Fetches tariff plans from database and keeps them cached
*/
public async fetchPlans(): Promise<void> {
this.plans = await this.plansCollection.find({}).toArray();
this.knownMissingPlanIds.clear();

if (this.plans.length === 0) {
throw new CriticalError('Please add tariff plans to the database');
}
}

/**
Expand Down Expand Up @@ -148,71 +197,98 @@ export class DbHelper {
return this.projectsCollection.find(query).toArray();
}

/**
* Returns plan from cache, refetches once on miss
*
* @param planId - id of the plan to find
*/
private async resolvePlan(planId: WorkspaceDBScheme['tariffPlanId']): Promise<PlanDBScheme | null> {
let plan = this.findPlanById(planId);

if (plan) {
return plan;
}

const planIdStr = planId.toString();

if (this.knownMissingPlanIds.has(planIdStr)) {
return null;
}

await this.fetchPlans();
plan = this.findPlanById(planId);

if (!plan) {
this.knownMissingPlanIds.add(planIdStr);
}
Comment on lines +221 to +223
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to refresh cache when there is no plan found in the cache

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually already refreshes fetchPlans() runs right above on a miss. These lines only run when the plan is still absent after that refresh.


return plan ?? null;
Comment thread
Kuchizu marked this conversation as resolved.
}

/**
* @param planId - id of the plan to find
*/
private findPlanById(planId: WorkspaceDBScheme['tariffPlanId']): PlanDBScheme | undefined {
return this.plans.find((plan) => plan._id.toString() === planId.toString());
}

/**
* Returns a single workspace with its tariff plan by id
*
* @param id - workspace id
*/
private async getOneWorkspaceWithTariffPlan(id: string): Promise<WorkspaceWithTariffPlan> {
const pipeline = [
{
$match: {
_id: new ObjectId(id),
},
},
...this.tariffPlanLookupPipeline(),
];

const workspace = await this.workspacesCollection.aggregate<WorkspaceWithTariffPlan>(pipeline).next();
const workspace = await this.workspacesCollection
.find({ _id: new ObjectId(id) })
.project<WorkspaceForLimiter>(WORKSPACE_PROJECTION)
.next();

if (workspace === null) {
throw new NonCriticalError(`Workspace ${id} not found`, {
workspaceId: id,
});
Comment thread
Kuchizu marked this conversation as resolved.
}

return workspace;
const plan = await this.resolvePlan(workspace.tariffPlanId);

if (!plan) {
throw new NonCriticalError(`Tariff plan ${workspace.tariffPlanId.toString()} not found for workspace ${id}`, {
workspaceId: id,
});
}

return {
...workspace,
tariffPlan: plan,
};
}

/**
* Yields all workspaces with their tariff plans one by one
*/
private async * yieldWorkspacesWithTariffPlans(): AsyncGenerator<WorkspaceWithTariffPlan> {
const pipeline = this.tariffPlanLookupPipeline();
const cursor = this.workspacesCollection.aggregate<WorkspaceWithTariffPlan>(pipeline);
const cursor = this.workspacesCollection
.find({})
.project<WorkspaceForLimiter>(WORKSPACE_PROJECTION);

for await (const workspace of cursor) {
yield workspace;
}
}
const plan = await this.resolvePlan(workspace.tariffPlanId);

/* eslint-disable-next-line */
private tariffPlanLookupPipeline(): any[] {
return [
{
$lookup: {
from: 'plans',
localField: 'tariffPlanId',
foreignField: '_id',
as: 'tariffPlan',
},
},
{
$unwind: {
path: '$tariffPlan',
},
},
{
$project: {
_id: 1,
name: 1,
isBlocked: 1,
blockedDate: 1,
lastChargeDate: 1,
billingPeriodEventsCount: 1,
tariffPlan: 1,
},
},
];
if (!plan) {
HawkCatcher.send(
new Error(`[Limiter] Tariff plan not found for workspace`),
{
workspaceId: workspace._id.toString(),
tariffPlanId: workspace.tariffPlanId?.toString(),
}
);
continue;
}

yield {
...workspace,
tariffPlan: plan,
};
}
}
}
}
7 changes: 5 additions & 2 deletions workers/limiter/src/index.ts
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add tests for plans caching

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Worker } from '../../../lib/worker';
import * as pkg from '../package.json';
import * as path from 'path';
import * as dotenv from 'dotenv';
import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
import HawkCatcher from '@hawk.so/nodejs';
import { MS_IN_SEC } from '../../../lib/utils/consts';
import LimiterEvent, { BlockWorkspaceEvent, UnblockWorkspaceEvent } from '../types/eventTypes';
Expand Down Expand Up @@ -68,8 +68,11 @@ export default class LimiterWorker extends Worker {

const projectsCollection = accountDbConnection.collection<ProjectDBScheme>('projects');
const workspacesCollection = accountDbConnection.collection<WorkspaceDBScheme>('workspaces');
const plansCollection = accountDbConnection.collection<PlanDBScheme>('plans');

this.dbHelper = new DbHelper(projectsCollection, workspacesCollection, eventsDbConnection);
this.dbHelper = new DbHelper(projectsCollection, workspacesCollection, plansCollection, eventsDbConnection);

await this.dbHelper.fetchPlans();

await this.redis.initialize();

Expand Down
Loading
Loading