diff --git a/infrastructure/terraform/components/api/README.md b/infrastructure/terraform/components/api/README.md
index f85a3e5ee..46f826ce1 100644
--- a/infrastructure/terraform/components/api/README.md
+++ b/infrastructure/terraform/components/api/README.md
@@ -35,7 +35,7 @@ No requirements.
| [kms\_deletion\_window](#input\_kms\_deletion\_window) | When a kms key is deleted, how long should it wait in the pending deletion state? | `string` | `"30"` | no |
| [letter\_event\_source](#input\_letter\_event\_source) | Source value to use for the letter status event updates | `string` | `"/data-plane/supplier-api/nhs-supplier-api-prod/main/update-status"` | no |
| [letter\_table\_ttl\_hours](#input\_letter\_table\_ttl\_hours) | Number of hours to set as TTL on letters table | `number` | `24` | no |
-| [letter\_variant\_map](#input\_letter\_variant\_map) | n/a | `map(object({ supplierId = string, specId = string }))` |
{
"lv1": {
"specId": "spec1",
"supplierId": "supplier1"
},
"lv2": {
"specId": "spec2",
"supplierId": "supplier1"
},
"lv3": {
"specId": "spec3",
"supplierId": "supplier2"
}
} | no |
+| [letter\_variant\_map](#input\_letter\_variant\_map) | n/a | `map(object({ supplierId = string, specId = string, priority = number }))` | {
"lv1": {
"priority": 10,
"specId": "spec1",
"supplierId": "supplier1"
},
"lv2": {
"priority": 10,
"specId": "spec2",
"supplierId": "supplier1"
},
"lv3": {
"priority": 10,
"specId": "spec3",
"supplierId": "supplier2"
}
} | no |
| [log\_level](#input\_log\_level) | The log level to be used in lambda functions within the component. Any log with a lower severity than the configured value will not be logged: https://docs.python.org/3/library/logging.html#levels | `string` | `"INFO"` | no |
| [log\_retention\_in\_days](#input\_log\_retention\_in\_days) | The retention period in days for the Cloudwatch Logs events to be retained, default of 0 is indefinite | `number` | `0` | no |
| [manually\_configure\_mtls\_truststore](#input\_manually\_configure\_mtls\_truststore) | Manually manage the truststore used for API Gateway mTLS (e.g. for prod environment) | `bool` | `false` | no |
diff --git a/infrastructure/terraform/components/api/ddb_table_letter_queue.tf b/infrastructure/terraform/components/api/ddb_table_letter_queue.tf
index b6952ab46..0aa940121 100644
--- a/infrastructure/terraform/components/api/ddb_table_letter_queue.tf
+++ b/infrastructure/terraform/components/api/ddb_table_letter_queue.tf
@@ -12,7 +12,7 @@ resource "aws_dynamodb_table" "letter_queue" {
local_secondary_index {
name = "queueSortOrder-index"
- range_key = "queueTimestamp"
+ range_key = "queueSortOrderSk"
projection_type = "ALL"
}
@@ -27,7 +27,7 @@ resource "aws_dynamodb_table" "letter_queue" {
}
attribute {
- name = "queueTimestamp"
+ name = "queueSortOrderSk"
type = "S"
}
diff --git a/infrastructure/terraform/components/api/variables.tf b/infrastructure/terraform/components/api/variables.tf
index 1a0cc0ef0..51af73e5d 100644
--- a/infrastructure/terraform/components/api/variables.tf
+++ b/infrastructure/terraform/components/api/variables.tf
@@ -136,11 +136,11 @@ variable "eventpub_control_plane_bus_arn" {
}
variable "letter_variant_map" {
- type = map(object({ supplierId = string, specId = string }))
+ type = map(object({ supplierId = string, specId = string, priority = number }))
default = {
- "lv1" = { supplierId = "supplier1", specId = "spec1" },
- "lv2" = { supplierId = "supplier1", specId = "spec2" },
- "lv3" = { supplierId = "supplier2", specId = "spec3" }
+ "lv1" = { supplierId = "supplier1", specId = "spec1", priority = 10 },
+ "lv2" = { supplierId = "supplier1", specId = "spec2", priority = 10 },
+ "lv3" = { supplierId = "supplier2", specId = "spec3", priority = 10 }
}
}
diff --git a/internal/datastore/src/__test__/db.ts b/internal/datastore/src/__test__/db.ts
index ae652ad13..a513b74b8 100644
--- a/internal/datastore/src/__test__/db.ts
+++ b/internal/datastore/src/__test__/db.ts
@@ -22,6 +22,7 @@ export async function setupDynamoDBContainer() {
accessKeyId: "fakeMyKeyId",
secretAccessKey: "fakeSecretAccessKey",
},
+ maxAttempts: 1,
});
const docClient = DynamoDBDocumentClient.from(ddbClient);
@@ -132,7 +133,7 @@ const createLetterQueueTableCommand = new CreateTableCommand({
IndexName: "queueSortOrder-index",
KeySchema: [
{ AttributeName: "supplierId", KeyType: "HASH" }, // Partition key for LSI
- { AttributeName: "queueTimestamp", KeyType: "RANGE" }, // Sort key for LSI
+ { AttributeName: "queueSortOrderSk", KeyType: "RANGE" }, // Sort key for LSI
],
Projection: {
ProjectionType: "ALL",
@@ -142,7 +143,7 @@ const createLetterQueueTableCommand = new CreateTableCommand({
AttributeDefinitions: [
{ AttributeName: "supplierId", AttributeType: "S" },
{ AttributeName: "letterId", AttributeType: "S" },
- { AttributeName: "queueTimestamp", AttributeType: "S" },
+ { AttributeName: "queueSortOrderSk", AttributeType: "S" },
],
});
diff --git a/internal/datastore/src/__test__/letter-queue-repository.test.ts b/internal/datastore/src/__test__/letter-queue-repository.test.ts
index 04e8d57ca..629d8aefd 100644
--- a/internal/datastore/src/__test__/letter-queue-repository.test.ts
+++ b/internal/datastore/src/__test__/letter-queue-repository.test.ts
@@ -12,12 +12,16 @@ import { LetterAlreadyExistsError } from "../letter-already-exists-error";
import { createTestLogger } from "./logs";
import { LetterDoesNotExistError } from "../letter-does-not-exist-error";
-function createLetter(letterId = "letter1"): InsertPendingLetter {
+function createLetter(
+ overrides: Partial = {},
+): InsertPendingLetter {
return {
- letterId,
+ letterId: "letter1",
supplierId: "supplier1",
specificationId: "specification1",
groupId: "group1",
+ priority: 10,
+ ...overrides,
};
}
@@ -54,9 +58,11 @@ describe("LetterQueueRepository", () => {
});
describe("putLetter", () => {
- it("adds a letter to the database", async () => {
+ beforeEach(() => {
jest.useFakeTimers().setSystemTime(new Date("2026-03-04T13:15:45.000Z"));
+ });
+ it("adds a letter to the database", async () => {
const pendingLetter =
await letterQueueRepository.putLetter(createLetter());
@@ -65,9 +71,32 @@ describe("LetterQueueRepository", () => {
"2026-03-04T13:15:45.000Z",
);
expect(pendingLetter.ttl).toBe(1_772_633_745);
+ expect(pendingLetter.queueSortOrderSk).toBe(
+ "10-2026-03-04T13:15:45.000Z",
+ );
expect(await letterExists(db, "supplier1", "letter1")).toBe(true);
});
+ it("left-pads the priority with zeros in the sort key", async () => {
+ const pendingLetter = await letterQueueRepository.putLetter(
+ createLetter({ priority: 5 }),
+ );
+
+ expect(pendingLetter.queueSortOrderSk).toBe(
+ "05-2026-03-04T13:15:45.000Z",
+ );
+ });
+
+ it("defaults a missing priority to 10 in the sort key", async () => {
+ const pendingLetter = await letterQueueRepository.putLetter(
+ createLetter({ priority: undefined }),
+ );
+
+ expect(pendingLetter.queueSortOrderSk).toBe(
+ "10-2026-03-04T13:15:45.000Z",
+ );
+ });
+
it("throws LetterAlreadyExistsError when creating a letter which already exists", async () => {
await letterQueueRepository.putLetter(createLetter());
@@ -122,16 +151,21 @@ describe("LetterQueueRepository", () => {
});
});
-async function letterExists(
- db: DBContext,
- supplierId: string,
- letterId: string,
-): Promise {
+async function getLetter(db: DBContext, supplierId: string, letterId: string) {
const result = await db.docClient.send(
new GetCommand({
TableName: db.config.letterQueueTableName,
Key: { supplierId, letterId },
}),
);
- return result.Item !== undefined;
+ return result.Item;
+}
+
+async function letterExists(
+ db: DBContext,
+ supplierId: string,
+ letterId: string,
+): Promise {
+ const letter = await getLetter(db, supplierId, letterId);
+ return letter !== undefined;
}
diff --git a/internal/datastore/src/letter-queue-repository.ts b/internal/datastore/src/letter-queue-repository.ts
index 70592db25..e30eadcfc 100644
--- a/internal/datastore/src/letter-queue-repository.ts
+++ b/internal/datastore/src/letter-queue-repository.ts
@@ -24,16 +24,22 @@ export default class LetterQueueRepository {
readonly config: LetterQueueRepositoryConfig,
) {}
+ private readonly defaultPriority = 10;
+
async putLetter(
insertPendingLetter: InsertPendingLetter,
): Promise {
// needs to be an ISO timestamp as Db sorts alphabetically
const now = new Date().toISOString();
-
+ const priority = String(
+ insertPendingLetter.priority ?? this.defaultPriority,
+ );
+ const queueSortOrderSk = `${priority.padStart(2, "0")}-${now}`;
const pendingLetter: PendingLetter = {
...insertPendingLetter,
queueTimestamp: now,
visibilityTimestamp: now,
+ queueSortOrderSk,
ttl: Math.floor(
Date.now() / 1000 + 60 * 60 * this.config.letterQueueTtlHours,
),
diff --git a/internal/datastore/src/types.ts b/internal/datastore/src/types.ts
index 107a6c8b8..efeefe345 100644
--- a/internal/datastore/src/types.ts
+++ b/internal/datastore/src/types.ts
@@ -43,6 +43,7 @@ export const LetterSchemaBase = z.object({
export const LetterSchema = LetterSchemaBase.extend({
supplierId: idRef(SupplierSchema, "id"),
eventId: z.string().optional(),
+ priority: z.int().min(0).max(99).optional(), // A lower number represents a higher priority
url: z.url(),
createdAt: z.string(),
updatedAt: z.string(),
@@ -79,10 +80,12 @@ export type UpdateLetter = {
export const PendingLetterSchema = z.object({
supplierId: idRef(SupplierSchema, "id"),
letterId: idRef(LetterSchema, "id"),
- queueTimestamp: z.string().describe("Secondary index SK"),
+ queueTimestamp: z.string(),
visibilityTimestamp: z.string(),
+ queueSortOrderSk: z.string().describe("Secondary index SK"),
specificationId: z.string(),
groupId: z.string(),
+ priority: z.int().min(0).max(99).optional(),
ttl: z.int(),
});
@@ -90,7 +93,7 @@ export type PendingLetter = z.infer;
export type InsertPendingLetter = Omit<
PendingLetter,
- "ttl" | "queueTimestamp" | "visibilityTimestamp"
+ "ttl" | "queueTimestamp" | "visibilityTimestamp" | "queueSortOrderSk"
>;
export const MISchemaBase = z.object({
diff --git a/lambdas/supplier-allocator/package.json b/lambdas/supplier-allocator/package.json
index 1441424af..b37b16045 100644
--- a/lambdas/supplier-allocator/package.json
+++ b/lambdas/supplier-allocator/package.json
@@ -9,6 +9,7 @@
"@nhsdigital/nhs-notify-event-schemas-letter-rendering-v1": "npm:@nhsdigital/nhs-notify-event-schemas-letter-rendering@^1.1.5",
"@nhsdigital/nhs-notify-event-schemas-supplier-api": "^1.0.8",
"@types/aws-lambda": "^8.10.148",
+ "aws-embedded-metrics": "^4.2.1",
"aws-lambda": "^1.0.7",
"esbuild": "^0.27.2",
"pino": "^9.7.0",
diff --git a/lambdas/supplier-allocator/src/config/__tests__/env.test.ts b/lambdas/supplier-allocator/src/config/__tests__/env.test.ts
index dd013aebf..3ad3991ac 100644
--- a/lambdas/supplier-allocator/src/config/__tests__/env.test.ts
+++ b/lambdas/supplier-allocator/src/config/__tests__/env.test.ts
@@ -18,7 +18,8 @@ describe("lambdaEnv", () => {
process.env.VARIANT_MAP = `{
"lv1": {
"supplierId": "supplier1",
- "specId": "spec1"
+ "specId": "spec1",
+ "priority": 10
}
}`;
@@ -29,6 +30,7 @@ describe("lambdaEnv", () => {
lv1: {
supplierId: "supplier1",
specId: "spec1",
+ priority: 10,
},
},
});
diff --git a/lambdas/supplier-allocator/src/config/env.ts b/lambdas/supplier-allocator/src/config/env.ts
index 0adc39203..d1e07403f 100644
--- a/lambdas/supplier-allocator/src/config/env.ts
+++ b/lambdas/supplier-allocator/src/config/env.ts
@@ -5,6 +5,7 @@ const LetterVariantSchema = z.record(
z.object({
supplierId: z.string(),
specId: z.string(),
+ priority: z.int().min(0).max(99), // Lower number represents a higher priority
}),
);
export type LetterVariant = z.infer;
diff --git a/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts b/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts
index 23fc5981c..dc9524e5e 100644
--- a/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts
+++ b/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts
@@ -7,10 +7,39 @@ import {
$LetterEvent,
LetterEvent,
} from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src/events/letter-events";
+import { MetricStatus } from "@internal/helpers";
import createSupplierAllocatorHandler from "../allocate-handler";
import { Deps } from "../../config/deps";
import { EnvVars } from "../../config/env";
+function assertMetricLogged(
+ logger: pino.Logger,
+ supplier: string,
+ priority: string,
+ status: MetricStatus,
+ count: number,
+) {
+ expect(logger.info).toHaveBeenCalledWith(
+ expect.objectContaining({
+ Supplier: supplier,
+ Priority: priority,
+ _aws: expect.objectContaining({
+ CloudWatchMetrics: expect.arrayContaining([
+ expect.objectContaining({
+ Metrics: [
+ expect.objectContaining({
+ Name: status,
+ Value: count,
+ }),
+ ],
+ }),
+ ]),
+ }),
+ [status]: count,
+ }),
+ );
+}
+
function createSQSEvent(records: SQSRecord[]): SQSEvent {
return {
Records: records,
@@ -145,6 +174,7 @@ describe("createSupplierAllocatorHandler", () => {
lv1: {
supplierId: "supplier1",
specId: "spec1",
+ priority: 10,
},
},
} as EnvVars,
@@ -179,7 +209,16 @@ describe("createSupplierAllocatorHandler", () => {
expect(messageBody.supplierSpec).toEqual({
supplierId: "supplier1",
specId: "spec1",
+ priority: 10,
});
+
+ assertMetricLogged(
+ mockedDeps.logger,
+ "supplier1",
+ "10",
+ MetricStatus.Success,
+ 1,
+ );
});
test("parses SNS notification and sends message to SQS queue for v1 event", async () => {
@@ -205,6 +244,7 @@ describe("createSupplierAllocatorHandler", () => {
expect(messageBody.supplierSpec).toEqual({
supplierId: "supplier1",
specId: "spec1",
+ priority: 10,
});
});
@@ -226,6 +266,14 @@ describe("createSupplierAllocatorHandler", () => {
expect(result.batchItemFailures).toHaveLength(1);
expect(result.batchItemFailures[0].itemIdentifier).toBe("invalid-event");
expect((mockedDeps.logger.error as jest.Mock).mock.calls).toHaveLength(1);
+
+ assertMetricLogged(
+ mockedDeps.logger,
+ "unknown",
+ "unknown",
+ MetricStatus.Failure,
+ 1,
+ );
});
test("unwraps EventBridge envelope and extracts event details", async () => {
@@ -259,8 +307,11 @@ describe("createSupplierAllocatorHandler", () => {
const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0];
const messageBody = JSON.parse(sendCall.input.MessageBody);
- expect(messageBody.supplierSpec.supplierId).toBe("supplier1");
- expect(messageBody.supplierSpec.specId).toBe("spec1");
+ expect(messageBody.supplierSpec).toEqual({
+ supplierId: "supplier1",
+ specId: "spec1",
+ priority: 10,
+ });
});
test("processes multiple messages in batch", async () => {
@@ -392,6 +443,14 @@ describe("createSupplierAllocatorHandler", () => {
expect(result.batchItemFailures).toHaveLength(1);
expect(result.batchItemFailures[0].itemIdentifier).toBe("msg1");
expect((mockedDeps.logger.error as jest.Mock).mock.calls).toHaveLength(1);
+
+ assertMetricLogged(
+ mockedDeps.logger,
+ "supplier1",
+ "10",
+ MetricStatus.Failure,
+ 1,
+ );
});
test("processes mixed batch with successes and failures", async () => {
@@ -417,6 +476,21 @@ describe("createSupplierAllocatorHandler", () => {
expect(result.batchItemFailures[0].itemIdentifier).toBe("fail-msg");
expect(mockSqsClient.send).toHaveBeenCalledTimes(2);
+
+ assertMetricLogged(
+ mockedDeps.logger,
+ "supplier1",
+ "10",
+ MetricStatus.Success,
+ 2,
+ );
+ assertMetricLogged(
+ mockedDeps.logger,
+ "unknown",
+ "unknown",
+ MetricStatus.Failure,
+ 1,
+ );
});
test("sends correct queue URL in SQS message command", async () => {
@@ -435,4 +509,48 @@ describe("createSupplierAllocatorHandler", () => {
const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0];
expect(sendCall.input.QueueUrl).toBe(queueUrl);
});
+
+ test("emits separate metrics per supplier and priority combination", async () => {
+ mockedDeps.env.VARIANT_MAP = {
+ lv1: { supplierId: "supplier1", specId: "spec1", priority: 10 },
+ lv2: { supplierId: "supplier2", specId: "spec2", priority: 5 },
+ } as any;
+
+ const eventForSupplier1 = createPreparedV2Event({ domainId: "letter1" });
+ const eventForSupplier2 = {
+ ...createPreparedV2Event({ domainId: "letter2" }),
+ data: {
+ ...createPreparedV2Event({ domainId: "letter2" }).data,
+ letterVariantId: "lv2",
+ },
+ };
+
+ const evt: SQSEvent = createSQSEvent([
+ createSqsRecord("msg1", JSON.stringify(eventForSupplier1)),
+ createSqsRecord("msg2", JSON.stringify(eventForSupplier2)),
+ ]);
+
+ process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue";
+
+ const handler = createSupplierAllocatorHandler(mockedDeps);
+ const result = await handler(evt, {} as any, {} as any);
+ if (!result) throw new Error("expected BatchResponse, got void");
+
+ expect(result.batchItemFailures).toHaveLength(0);
+
+ assertMetricLogged(
+ mockedDeps.logger,
+ "supplier1",
+ "10",
+ MetricStatus.Success,
+ 1,
+ );
+ assertMetricLogged(
+ mockedDeps.logger,
+ "supplier2",
+ "5",
+ MetricStatus.Success,
+ 1,
+ );
+ });
});
diff --git a/lambdas/supplier-allocator/src/handler/allocate-handler.ts b/lambdas/supplier-allocator/src/handler/allocate-handler.ts
index 0402e2846..3fd4f3be5 100644
--- a/lambdas/supplier-allocator/src/handler/allocate-handler.ts
+++ b/lambdas/supplier-allocator/src/handler/allocate-handler.ts
@@ -4,9 +4,11 @@ import { LetterRequestPreparedEvent } from "@nhsdigital/nhs-notify-event-schemas
import { LetterRequestPreparedEventV2 } from "@nhsdigital/nhs-notify-event-schemas-letter-rendering";
import z from "zod";
+import { Unit } from "aws-embedded-metrics";
+import { MetricEntry, MetricStatus, buildEMFObject } from "@internal/helpers";
import { Deps } from "../config/deps";
-type SupplierSpec = { supplierId: string; specId: string };
+type SupplierSpec = { supplierId: string; specId: string; priority: number };
type PreparedEvents = LetterRequestPreparedEventV2 | LetterRequestPreparedEvent;
// small envelope that must exist in all inputs
@@ -50,11 +52,49 @@ function getSupplier(letterEvent: PreparedEvents, deps: Deps): SupplierSpec {
return resolveSupplierForVariant(letterEvent.data.letterVariantId, deps);
}
+type AllocationMetrics = Map>;
+
+function incrementMetric(
+ map: AllocationMetrics,
+ supplier: string,
+ priority: string,
+) {
+ const byPriority = map.get(supplier) ?? new Map();
+ byPriority.set(priority, (byPriority.get(priority) ?? 0) + 1);
+ map.set(supplier, byPriority);
+}
+
+function emitMetrics(
+ metrics: AllocationMetrics,
+ status: MetricStatus,
+ deps: Deps,
+) {
+ const namespace = "supplier-allocator";
+ for (const [supplier, byPriority] of metrics) {
+ for (const [priority, count] of byPriority) {
+ const dimensions: Record = {
+ Priority: priority,
+ Supplier: supplier,
+ };
+ const metric: MetricEntry = {
+ key: status,
+ value: count,
+ unit: Unit.Count,
+ };
+ deps.logger.info(buildEMFObject(namespace, dimensions, metric));
+ }
+ }
+}
+
export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler {
return async (event: SQSEvent) => {
const batchItemFailures: SQSBatchItemFailure[] = [];
+ const perAllocationSuccess: AllocationMetrics = new Map();
+ const perAllocationFailure: AllocationMetrics = new Map();
const tasks = event.Records.map(async (record) => {
+ let supplier = "unknown";
+ let priority = "unknown";
try {
const letterEvent: unknown = JSON.parse(record.body);
@@ -67,6 +107,9 @@ export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler {
const supplierSpec = getSupplier(letterEvent as PreparedEvents, deps);
+ supplier = supplierSpec.supplierId;
+ priority = String(supplierSpec.priority);
+
deps.logger.info({
description: "Resolved supplier spec",
supplierSpec,
@@ -95,6 +138,8 @@ export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler {
MessageBody: JSON.stringify(queueMessage),
}),
);
+
+ incrementMetric(perAllocationSuccess, supplier, priority);
} catch (error) {
deps.logger.error({
description: "Error processing allocation of record",
@@ -102,12 +147,15 @@ export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler {
messageId: record.messageId,
message: record.body,
});
+ incrementMetric(perAllocationFailure, supplier, priority);
batchItemFailures.push({ itemIdentifier: record.messageId });
}
});
await Promise.all(tasks);
+ emitMetrics(perAllocationSuccess, MetricStatus.Success, deps);
+ emitMetrics(perAllocationFailure, MetricStatus.Failure, deps);
return { batchItemFailures };
};
}
diff --git a/lambdas/update-letter-queue/src/__tests__/update-letter-queue.test.ts b/lambdas/update-letter-queue/src/__tests__/update-letter-queue.test.ts
index 801b79175..4775b00ea 100644
--- a/lambdas/update-letter-queue/src/__tests__/update-letter-queue.test.ts
+++ b/lambdas/update-letter-queue/src/__tests__/update-letter-queue.test.ts
@@ -6,7 +6,6 @@ import {
} from "@internal/datastore";
import { mockDeep } from "jest-mock-extended";
import pino from "pino";
-import { MetricStatus } from "@internal/helpers";
import {
Context,
DynamoDBRecord,
@@ -32,9 +31,12 @@ const mockedDeps: jest.Mocked = {
env: {} as unknown as EnvVars,
} as Deps;
-function generateLetter(status: LetterStatus, id?: string): Letter {
+function generateLetter(
+ status: LetterStatus,
+ overrides?: Partial,
+): Letter {
return {
- id: id || "1",
+ id: "1",
status,
specificationId: "spec1",
supplierId: "supplier1",
@@ -48,6 +50,7 @@ function generateLetter(status: LetterStatus, id?: string): Letter {
source: "test-source",
subject: "test-subject",
billingRef: "billing-ref-1",
+ ...overrides,
};
}
@@ -144,8 +147,8 @@ describe("update-letter-queue Lambda", () => {
it("returns on the first failure", async () => {
const handler = createHandler(mockedDeps);
- const newLetter1 = generateLetter("PENDING", "1");
- const newLetter2 = generateLetter("PENDING", "2");
+ const newLetter1 = generateLetter("PENDING", { id: "1" });
+ const newLetter2 = generateLetter("PENDING", { id: "2" });
(mockedDeps.letterQueueRepository.putLetter as jest.Mock)
.mockRejectedValueOnce({})
.mockResolvedValueOnce({});
@@ -164,8 +167,8 @@ describe("update-letter-queue Lambda", () => {
it("does not treat a replayed insert as a failure", async () => {
const handler = createHandler(mockedDeps);
- const newLetter1 = generateLetter("PENDING", "1");
- const newLetter2 = generateLetter("PENDING", "2");
+ const newLetter1 = generateLetter("PENDING", { id: "1" });
+ const newLetter2 = generateLetter("PENDING", { id: "2" });
(mockedDeps.letterQueueRepository.putLetter as jest.Mock)
.mockRejectedValueOnce(new LetterAlreadyExistsError("supplier1", "1"))
.mockResolvedValueOnce({});
@@ -181,10 +184,10 @@ describe("update-letter-queue Lambda", () => {
it("does not treat a replayed delete as a failure", async () => {
const handler = createHandler(mockedDeps);
- const oldLetter1 = generateLetter("PENDING", "1");
- const oldLetter2 = generateLetter("PENDING", "2");
- const newLetter1 = generateLetter("ACCEPTED", "1");
- const newLetter2 = generateLetter("ACCEPTED", "2");
+ const oldLetter1 = generateLetter("PENDING", { id: "1" });
+ const oldLetter2 = generateLetter("PENDING", { id: "2" });
+ const newLetter1 = generateLetter("ACCEPTED", { id: "1" });
+ const newLetter2 = generateLetter("ACCEPTED", { id: "2" });
(mockedDeps.letterQueueRepository.deleteLetter as jest.Mock)
.mockRejectedValueOnce(new LetterDoesNotExistError("supplier1", "1"))
.mockResolvedValueOnce({});
@@ -227,26 +230,51 @@ describe("update-letter-queue Lambda", () => {
});
describe("Metrics", () => {
- it("emits success metrics when all letters are processed successfully", async () => {
+ // eslint-disable-next-line jest/expect-expect
+ it("logs a metric containing the delta of pending letters added/deleted", async () => {
const handler = createHandler(mockedDeps);
- const oldLetter1 = generateLetter("PENDING", "1");
- const newLetter1 = generateLetter("ACCEPTED", "1");
- const newLetter2 = generateLetter("PENDING", "2");
+ const oldLetter1 = generateLetter("PENDING", { id: "1" });
+ const newLetter1 = generateLetter("ACCEPTED", { id: "1" });
+ const newLetter2 = generateLetter("PENDING", { id: "2" });
+ const newLetter3 = generateLetter("PENDING", { id: "3" });
const testData = generateKinesisEvent([
generateModifyRecord(oldLetter1, newLetter1),
generateInsertRecord(newLetter2),
+ generateInsertRecord(newLetter3),
]);
await handler(testData, mockDeep(), jest.fn());
- assertSuccessMetricLogged(2);
- assertFailureMetricLogged(0);
+ assertQueueDeltaMetricLogged("supplier1", 1);
});
- it("emits failure metrics when a letter fails to be inserted", async () => {
+ // eslint-disable-next-line jest/expect-expect
+ it("breaks the metric down by supplier", async () => {
const handler = createHandler(mockedDeps);
- const newLetter1 = generateLetter("PENDING", "1");
- const newLetter2 = generateLetter("PENDING", "2");
+ const oldLetter1 = generateLetter("PENDING", { id: "1" });
+ const newLetter1 = generateLetter("ACCEPTED", { id: "1" });
+ const newLetter2 = generateLetter("PENDING", {
+ supplierId: "supplier2",
+ id: "2",
+ });
+ const newLetter3 = generateLetter("PENDING", { id: "3" });
+
+ const testData = generateKinesisEvent([
+ generateModifyRecord(oldLetter1, newLetter1),
+ generateInsertRecord(newLetter2),
+ generateInsertRecord(newLetter3),
+ ]);
+ await handler(testData, mockDeep(), jest.fn());
+
+ assertQueueDeltaMetricLogged("supplier1", 0);
+ assertQueueDeltaMetricLogged("supplier2", 1);
+ });
+
+ // eslint-disable-next-line jest/expect-expect
+ it("counts a failed insert as zero", async () => {
+ const handler = createHandler(mockedDeps);
+ const newLetter1 = generateLetter("PENDING", { id: "1" });
+ const newLetter2 = generateLetter("PENDING", { id: "2" });
(mockedDeps.letterQueueRepository.putLetter as jest.Mock)
.mockResolvedValueOnce({})
.mockRejectedValueOnce(new Error("DynamoDB error"));
@@ -257,16 +285,16 @@ describe("update-letter-queue Lambda", () => {
]);
await handler(testData, mockDeep(), jest.fn());
- assertSuccessMetricLogged(1);
- assertFailureMetricLogged(1);
+ assertQueueDeltaMetricLogged("supplier1", 1);
});
- it("emits failure metrics when a letter fails to be deleted", async () => {
+ // eslint-disable-next-line jest/expect-expect
+ it("counts a failed delete as zero", async () => {
const handler = createHandler(mockedDeps);
- const oldLetter1 = generateLetter("PENDING", "1");
- const oldLetter2 = generateLetter("PENDING", "2");
- const newLetter1 = generateLetter("ACCEPTED", "1");
- const newLetter2 = generateLetter("ACCEPTED", "2");
+ const oldLetter1 = generateLetter("PENDING", { id: "1" });
+ const oldLetter2 = generateLetter("PENDING", { id: "2" });
+ const newLetter1 = generateLetter("ACCEPTED", { id: "1" });
+ const newLetter2 = generateLetter("ACCEPTED", { id: "2" });
(mockedDeps.letterQueueRepository.deleteLetter as jest.Mock)
.mockResolvedValueOnce({})
.mockRejectedValueOnce(new Error("DynamoDB error"));
@@ -277,14 +305,14 @@ describe("update-letter-queue Lambda", () => {
]);
await handler(testData, mockDeep(), jest.fn());
- assertSuccessMetricLogged(1);
- assertFailureMetricLogged(1);
+ assertQueueDeltaMetricLogged("supplier1", -1);
});
- it("does not count a replayed insert as a success or failure", async () => {
+ // eslint-disable-next-line jest/expect-expect
+ it("counts a replayed insert as zero", async () => {
const handler = createHandler(mockedDeps);
- const newLetter1 = generateLetter("PENDING", "1");
- const newLetter2 = generateLetter("PENDING", "2");
+ const newLetter1 = generateLetter("PENDING", { id: "1" });
+ const newLetter2 = generateLetter("PENDING", { id: "2" });
(mockedDeps.letterQueueRepository.putLetter as jest.Mock)
.mockRejectedValueOnce(new LetterAlreadyExistsError("supplier1", "1"))
@@ -296,16 +324,16 @@ describe("update-letter-queue Lambda", () => {
]);
await handler(testData, mockDeep(), jest.fn());
- assertSuccessMetricLogged(1);
- assertFailureMetricLogged(0);
+ assertQueueDeltaMetricLogged("supplier1", 1);
});
- it("does not count a replayed delete as a success or failure", async () => {
+ // eslint-disable-next-line jest/expect-expect
+ it("counts a replayed delete as zero", async () => {
const handler = createHandler(mockedDeps);
- const oldLetter1 = generateLetter("PENDING", "1");
- const oldLetter2 = generateLetter("PENDING", "2");
- const newLetter1 = generateLetter("ACCEPTED", "1");
- const newLetter2 = generateLetter("ACCEPTED", "2");
+ const oldLetter1 = generateLetter("PENDING", { id: "1" });
+ const oldLetter2 = generateLetter("PENDING", { id: "2" });
+ const newLetter1 = generateLetter("ACCEPTED", { id: "1" });
+ const newLetter2 = generateLetter("ACCEPTED", { id: "2" });
(mockedDeps.letterQueueRepository.deleteLetter as jest.Mock)
.mockRejectedValueOnce(new LetterDoesNotExistError("supplier1", "1"))
.mockResolvedValueOnce({});
@@ -316,19 +344,36 @@ describe("update-letter-queue Lambda", () => {
]);
await handler(testData, mockDeep(), jest.fn());
- assertSuccessMetricLogged(1);
- assertFailureMetricLogged(0);
+ assertQueueDeltaMetricLogged("supplier1", -1);
});
- it("emits zero success metrics when no pending letters are in the batch", async () => {
+ // eslint-disable-next-line jest/expect-expect
+ it("logs zero counts when no pending letters are in the batch", async () => {
const handler = createHandler(mockedDeps);
const newLetter = generateLetter("PRINTED");
const testData = generateKinesisEvent([generateInsertRecord(newLetter)]);
await handler(testData, mockDeep(), jest.fn());
- assertSuccessMetricLogged(0);
- assertFailureMetricLogged(0);
+ assertQueueDeltaMetricNotLogged();
+ });
+
+ it("skips records with no NewImage (e.g. DELETE events) without error", async () => {
+ const handler = createHandler(mockedDeps);
+ const deleteRecord: DynamoDBRecord = {
+ eventName: "REMOVE",
+ dynamodb: { OldImage: mapToImage(generateLetter("PENDING")) },
+ };
+
+ const testData = generateKinesisEvent([deleteRecord]);
+ const result = await handler(testData, mockDeep(), jest.fn());
+
+ expect(mockedDeps.letterQueueRepository.putLetter).not.toHaveBeenCalled();
+ expect(
+ mockedDeps.letterQueueRepository.deleteLetter,
+ ).not.toHaveBeenCalled();
+ expect(result.batchItemFailures).toEqual([]);
+ assertQueueDeltaMetricNotLogged();
});
});
});
@@ -375,43 +420,42 @@ function mapToImage(oldLetter: Letter) {
);
}
-function assertSuccessMetricLogged(count: number) {
+function assertQueueDeltaMetricLogged(supplierId: string, delta: number) {
expect(mockedDeps.logger.info).toHaveBeenCalledWith(
expect.objectContaining({
+ supplier: supplierId,
_aws: expect.objectContaining({
CloudWatchMetrics: expect.arrayContaining([
expect.objectContaining({
Metrics: [
expect.objectContaining({
- Name: MetricStatus.Success,
- Value: count,
+ Name: "QueueDelta",
+ Value: delta,
+ Unit: Unit.Count,
}),
],
}),
]),
}),
- success: count,
+ QueueDelta: delta,
}),
);
}
-function assertFailureMetricLogged(count: number) {
- expect(mockedDeps.logger.info).toHaveBeenCalledWith(
+function assertQueueDeltaMetricNotLogged() {
+ expect(mockedDeps.logger.info).not.toHaveBeenCalledWith(
expect.objectContaining({
_aws: expect.objectContaining({
CloudWatchMetrics: expect.arrayContaining([
expect.objectContaining({
Metrics: [
expect.objectContaining({
- Name: MetricStatus.Failure,
- Value: count,
- Unit: Unit.Count,
+ Name: "QueueDelta",
}),
],
}),
]),
}),
- failure: count,
}),
);
}
diff --git a/lambdas/update-letter-queue/src/update-letter-queue.ts b/lambdas/update-letter-queue/src/update-letter-queue.ts
index 392336fb2..a41dfc0b3 100644
--- a/lambdas/update-letter-queue/src/update-letter-queue.ts
+++ b/lambdas/update-letter-queue/src/update-letter-queue.ts
@@ -6,7 +6,7 @@ import {
} from "aws-lambda";
import { unmarshall } from "@aws-sdk/util-dynamodb";
import { Unit } from "aws-embedded-metrics";
-import { MetricStatus, buildEMFObject } from "@internal/helpers";
+import { buildEMFObject } from "@internal/helpers";
import {
InsertPendingLetter,
Letter,
@@ -20,6 +20,9 @@ export default function createHandler(deps: Deps): Handler {
return async (streamEvent: KinesisStreamEvent) => {
let successCount = 0;
+ // The change in the size of the pending letters queue, keyed by supplier
+ const deltasBySupplierId = new Map();
+
deps.logger.info({ description: "Received event", streamEvent });
deps.logger.info({
description: "Number of records",
@@ -31,11 +34,15 @@ export default function createHandler(deps: Deps): Handler {
try {
if (isNewPendingLetter(ddbRecord)) {
- const added = await addPendingLetterToQueue(ddbRecord, deps);
- successCount += added ? 1 : 0;
+ const letter = extractNewOrUpdatedLetter(ddbRecord);
+ const added = await addPendingLetterToQueue(letter, deps);
+ updateDeltas(deltasBySupplierId, letter.supplierId, added);
+ successCount += added;
} else if (isNoLongerPending(ddbRecord)) {
- const deleted = await deletePendingLetterFromQueue(ddbRecord, deps);
- successCount += deleted ? 1 : 0;
+ const letter = extractNewOrUpdatedLetter(ddbRecord);
+ const deleted = await deletePendingLetterFromQueue(letter, deps);
+ updateDeltas(deltasBySupplierId, letter.supplierId, -deleted);
+ successCount += deleted;
}
} catch (error) {
deps.logger.error({
@@ -43,7 +50,7 @@ export default function createHandler(deps: Deps): Handler {
error,
ddbRecord,
});
- recordProcessing(deps, successCount, 1);
+ recordProcessing(deps, successCount, 1, deltasBySupplierId);
// If we get a failure, return immediately without processing the remaining records. Since we are
// working with a Kinesis stream, AWS will retry from the point of failure and no records will be lost.
// See https://docs.aws.amazon.com/lambda/latest/dg/example_serverless_Kinesis_Lambda_batch_item_failures_section.html
@@ -54,16 +61,15 @@ export default function createHandler(deps: Deps): Handler {
};
}
}
- recordProcessing(deps, successCount, 0);
+ recordProcessing(deps, successCount, 0, deltasBySupplierId);
return { batchItemFailures: [] };
};
}
async function addPendingLetterToQueue(
- ddbRecord: DynamoDBRecord,
+ letter: Letter,
deps: Deps,
-): Promise {
- const letter = extractNewLetter(ddbRecord);
+): Promise {
const pendingLetter = mapLetterToPendingLetter(letter);
try {
@@ -72,7 +78,7 @@ async function addPendingLetterToQueue(
pendingLetter,
});
await deps.letterQueueRepository.putLetter(pendingLetter);
- return true;
+ return 1;
} catch (error) {
if (error instanceof LetterAlreadyExistsError) {
deps.logger.warn({
@@ -80,17 +86,16 @@ async function addPendingLetterToQueue(
supplierId: pendingLetter.supplierId,
letterId: pendingLetter.letterId,
});
- return false;
+ return 0;
}
throw error;
}
}
async function deletePendingLetterFromQueue(
- ddbRecord: DynamoDBRecord,
+ letter: Letter,
deps: Deps,
-): Promise {
- const letter = extractNewLetter(ddbRecord);
+): Promise {
try {
deps.logger.info({
description: "Deleting pending letter",
@@ -98,7 +103,7 @@ async function deletePendingLetterFromQueue(
letterId: letter.id,
});
await deps.letterQueueRepository.deleteLetter(letter.supplierId, letter.id);
- return true;
+ return 1;
} catch (error) {
if (error instanceof LetterDoesNotExistError) {
deps.logger.warn({
@@ -106,7 +111,7 @@ async function deletePendingLetterFromQueue(
supplierId: letter.supplierId,
letterId: letter.id,
});
- return false;
+ return 0;
}
throw error;
}
@@ -116,6 +121,7 @@ function recordProcessing(
deps: Deps,
successCount: number,
failureCount: number,
+ deltasBySupplierId: Map,
) {
deps.logger.info({
description: "Processing complete",
@@ -124,8 +130,9 @@ function recordProcessing(
totalProcessed: successCount + failureCount,
});
- deps.logger.info(buildMetric(MetricStatus.Success, successCount));
- deps.logger.info(buildMetric(MetricStatus.Failure, failureCount));
+ for (const [supplierId, delta] of deltasBySupplierId) {
+ deps.logger.info(buildMetric(supplierId, delta));
+ }
}
function isNewPendingLetter(record: DynamoDBRecord): boolean {
@@ -172,8 +179,8 @@ function extractPayload(
}
}
-function extractNewLetter(record: DynamoDBRecord): Letter {
- const newImage = record.dynamodb?.NewImage!;
+function extractNewOrUpdatedLetter(record: DynamoDBRecord): Letter {
+ const newImage = record.dynamodb?.NewImage;
return LetterSchema.parse(unmarshall(newImage as any));
}
@@ -186,14 +193,23 @@ function mapLetterToPendingLetter(letter: Letter): InsertPendingLetter {
};
}
-function buildMetric(status: MetricStatus, count: number) {
+function buildMetric(supplierId: string, delta: number) {
return buildEMFObject(
"update-letter-queue",
- {},
+ { supplier: supplierId },
{
- key: status,
- value: count,
+ key: "QueueDelta",
+ value: delta,
unit: Unit.Count,
},
);
}
+
+function updateDeltas(
+ deltasBySupplierId: Map,
+ supplierId: string,
+ delta: number,
+): void {
+ const current = deltasBySupplierId.get(supplierId) ?? 0;
+ deltasBySupplierId.set(supplierId, current + delta);
+}
diff --git a/lambdas/upsert-letter/src/handler/__tests__/upsert-handler.test.ts b/lambdas/upsert-letter/src/handler/__tests__/upsert-handler.test.ts
index 6292de0f7..9f855d176 100644
--- a/lambdas/upsert-letter/src/handler/__tests__/upsert-handler.test.ts
+++ b/lambdas/upsert-letter/src/handler/__tests__/upsert-handler.test.ts
@@ -211,11 +211,11 @@ describe("createUpsertLetterHandler", () => {
test("processes all records successfully and returns no batch failures", async () => {
const v2message = {
letterEvent: createPreparedV2Event(),
- supplierSpec: { supplierId: "supplier1", specId: "spec1" },
+ supplierSpec: { supplierId: "supplier1", specId: "spec1", priority: 10 },
};
const v1message = {
letterEvent: createPreparedV1Event(),
- supplierSpec: { supplierId: "supplier1", specId: "spec1" },
+ supplierSpec: { supplierId: "supplier1", specId: "spec1", priority: 10 },
};
const evt: SQSEvent = createSQSEvent([
@@ -249,6 +249,7 @@ describe("createUpsertLetterHandler", () => {
expect(insertedV2Letter.status).toBe("PENDING");
expect(insertedV2Letter.groupId).toBe("client1campaign1template1");
expect(insertedV2Letter.source).toBe("/data-plane/letter-rendering/test");
+ expect(insertedV2Letter.priority).toBe(10);
const insertedV1Letter = (mockedDeps.letterRepo.putLetter as jest.Mock).mock
.calls[1][0];
@@ -260,6 +261,7 @@ describe("createUpsertLetterHandler", () => {
expect(insertedV1Letter.status).toBe("PENDING");
expect(insertedV1Letter.groupId).toBe("client1campaign1template1");
expect(insertedV1Letter.source).toBe("/data-plane/letter-rendering/test");
+ expect(insertedV1Letter.priority).toBe(10);
const updatedLetter = (
mockedDeps.letterRepo.updateLetterStatus as jest.Mock
@@ -472,14 +474,14 @@ describe("createUpsertLetterHandler", () => {
id: "7b9a03ca-342a-4150-b56b-989109c45615",
domainId: "ok",
}),
- supplierSpec: { supplierId: "supplier1", specId: "spec1" },
+ supplierSpec: { supplierId: "supplier1", specId: "spec1", priority: 10 },
};
const message2 = {
letterEvent: createPreparedV2Event({
id: "7b9a03ca-342a-4150-b56b-989109c45616",
domainId: "fail",
}),
- supplierSpec: { supplierId: "supplier1", specId: "spec1" },
+ supplierSpec: { supplierId: "supplier1", specId: "spec1", priority: 10 },
};
const evt: SQSEvent = createSQSEvent([
createSqsRecord("ok-msg", JSON.stringify(message1)),
diff --git a/lambdas/upsert-letter/src/handler/upsert-handler.ts b/lambdas/upsert-letter/src/handler/upsert-handler.ts
index ada416ec2..ab9bd9877 100644
--- a/lambdas/upsert-letter/src/handler/upsert-handler.ts
+++ b/lambdas/upsert-letter/src/handler/upsert-handler.ts
@@ -16,14 +16,16 @@ import z from "zod";
import { MetricsLogger, Unit, metricScope } from "aws-embedded-metrics";
import { Deps } from "../config/deps";
-type SupplierSpec = { supplierId: string; specId: string };
type PreparedEvents = LetterRequestPreparedEventV2 | LetterRequestPreparedEvent;
const SupplierSpecSchema = z.object({
supplierId: z.string().min(1),
specId: z.string().min(1),
+ priority: z.int().min(0).max(99).default(10),
});
+type SupplierSpec = z.infer;
+
const PreparedEventUnionSchema = z.discriminatedUnion("type", [
$LetterRequestPreparedEventV2,
$LetterRequestPreparedEvent,
@@ -63,6 +65,7 @@ function getOperationFromType(type: string): UpsertOperation {
supplierSpec.supplierId,
supplierSpec.specId,
supplierSpec.specId, // use specId for now
+ supplierSpec.priority,
);
await deps.letterRepo.putLetter(letterToInsert);
@@ -99,6 +102,7 @@ function mapToInsertLetter(
supplier: string,
spec: string,
billingRef: string,
+ priority: number,
): InsertLetter {
const now = new Date().toISOString();
return {
@@ -107,6 +111,7 @@ function mapToInsertLetter(
supplierId: supplier,
status: "PENDING",
specificationId: spec,
+ priority,
groupId:
upsertRequest.data.clientId +
upsertRequest.data.campaignId +
@@ -235,7 +240,11 @@ export default function createUpsertLetterHandler(deps: Deps): SQSHandler {
await runUpsert(
operation,
letterEvent,
- supplierSpec ?? { supplierId: "unknown", specId: "unknown" },
+ supplierSpec ?? {
+ supplierId: "unknown",
+ specId: "unknown",
+ priority: 10,
+ },
deps,
);
diff --git a/package-lock.json b/package-lock.json
index d42cb9c5b..29f8fc0a4 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -239,6 +239,7 @@
"@nhsdigital/nhs-notify-event-schemas-letter-rendering-v1": "npm:@nhsdigital/nhs-notify-event-schemas-letter-rendering@^1.1.5",
"@nhsdigital/nhs-notify-event-schemas-supplier-api": "^1.0.8",
"@types/aws-lambda": "^8.10.148",
+ "aws-embedded-metrics": "^4.2.1",
"aws-lambda": "^1.0.7",
"esbuild": "^0.27.2",
"pino": "^9.7.0",