From 0c10dd206a5d6ff4f2189048c38f328ef9414c96 Mon Sep 17 00:00:00 2001 From: Angel Pastor Date: Wed, 20 May 2026 13:59:35 +0100 Subject: [PATCH 1/3] CCM-18337: Suggestion from Copilot, added independent file to check memory usage of previous approach vs new one --- memory-comparison.js | 217 ++++++++++++++++++++++ utils/utils/src/s3-utils/get-object-s3.ts | 12 +- 2 files changed, 224 insertions(+), 5 deletions(-) create mode 100644 memory-comparison.js diff --git a/memory-comparison.js b/memory-comparison.js new file mode 100644 index 000000000..032f8d429 --- /dev/null +++ b/memory-comparison.js @@ -0,0 +1,217 @@ +'use strict'; + +/** + * Memory comparison: streamToString approaches + * + * Run with: node --expose-gc memory-comparison.js + * + * --expose-gc enables global.gc() so the heap is fully collected between + * runs, giving a clean baseline for each measurement. + */ + +const { Readable } = require('node:stream'); +const { StringDecoder } = require('node:string_decoder'); + +// ── Configuration ───────────────────────────────────────────────────────────── + +const FILE_SIZE_MB = 6; +const CHUNK_SIZE_KB = 64; // matches Node.js stream default highWaterMark + +// ── Simulated S3 stream ─────────────────────────────────────────────────────── + +// Allocated once outside the runs so it is not counted in per-run measurements. +const SOURCE = Buffer.allocUnsafe(FILE_SIZE_MB * 1024 * 1024).fill(0x41); + +function makeStream() { + const chunkSize = CHUNK_SIZE_KB * 1024; + let offset = 0; + return new Readable({ + read() { + if (offset >= SOURCE.length) { + this.push(null); + return; + } + // subarray shares memory with SOURCE — no copy, matches how the S3 SDK + // emits chunks from a streaming GetObject response. + this.push(SOURCE.subarray(offset, offset + chunkSize)); + offset += chunkSize; + }, + }); +} + +// ── Approach 1: current code ────────────────────────────────────────────────── + +function currentApproach(stream) { + return new Promise((resolve, reject) => { + const chunks = []; + const snapshots = {}; + + stream.on('data', (chunk) => chunks.push(Buffer.from(chunk))); + stream.on('error', reject); + stream.on('end', () => { + // Measure at each synchronous step so we can see the stacked allocations. + // Buffer data bytes live in `arrayBuffers`, not `heapUsed`. + snapshots.beforeConcat = snapshot(); + + const buf = Buffer.concat(chunks); + // chunks[] + buf both live: arrayBuffers should be ~2× file size here. + snapshots.afterConcat = snapshot(); + + const result = buf.toString('utf8'); + // Peak: chunks[] (arrayBuffers) + buf (arrayBuffers) + string (heapUsed). + snapshots.afterToString = snapshot(); + + resolve({ result, snapshots }); + }); + }); +} + +// ── Approach 2: StringDecoder ───────────────────────────────────────────────── + +function decoderApproach(stream) { + return new Promise((resolve, reject) => { + const decoder = new StringDecoder('utf8'); + let accumulated = ''; + const chunkSnapshots = []; + + stream.on('data', (chunk) => { + const before = snapshot(); + accumulated += decoder.write(chunk); + const after = snapshot(); + chunkSnapshots.push({ before, after }); + }); + stream.on('error', reject); + stream.on('end', () => { + const result = accumulated + decoder.end(); + const afterEnd = snapshot(); + resolve({ result, chunkSnapshots, afterEnd }); + }); + }); +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +const mb = (bytes) => `${(bytes / 1024 / 1024).toFixed(2)} MB`; +const sign = (bytes) => `${bytes >= 0 ? '+' : ''}${mb(bytes)}`; +const sep = () => console.log('─'.repeat(62)); + +/** + * Returns the three memory fields relevant to this comparison: + * + * heapUsed – V8 object heap (JS strings, arrays, closures, etc.) + * arrayBuffers – backing store for ArrayBuffer / SharedArrayBuffer / Buffer + * data bytes (lives OUTSIDE the V8 heap) + * rss – total resident set (catches anything the above misses) + * + * Buffer data bytes sit in `arrayBuffers`, not `heapUsed`. + * String data bytes sit in `heapUsed`. + * Both must be tracked to see the full picture. + */ +function snapshot() { + const m = process.memoryUsage(); + return { heapUsed: m.heapUsed, arrayBuffers: m.arrayBuffers, rss: m.rss }; +} + +function forceGc() { + if (global.gc) { + global.gc(); + global.gc(); // second pass collects objects promoted after the first + } +} + +// ── Runners ─────────────────────────────────────────────────────────────────── + +// Each runner is a self-contained async function so the result string goes out +// of scope when it returns, letting GC collect it before the next run starts. + +async function runCurrent() { + forceGc(); + const baseline = snapshot(); + const b = { heapUsed: baseline.heapUsed, arrayBuffers: baseline.arrayBuffers }; + + const { result, snapshots } = await currentApproach(makeStream()); + + forceGc(); + const afterGc = snapshot(); + + sep(); + console.log('CURRENT – chunks[] + Buffer.concat + .toString()'); + sep(); + console.log(`Result length : ${mb(result.length)}`); + console.log(`Baseline : heap ${mb(b.heapUsed)} buf ${mb(b.arrayBuffers)} rss ${mb(baseline.rss)}`); + console.log(`Before concat : ${fmt(snapshots.beforeConcat, b)} ← chunks[] populated (buf ≈ file size)`); + console.log(`After concat : ${fmt(snapshots.afterConcat, b)} ← chunks[] + concat buf (buf ≈ 2× file)`); + console.log(`After toString: ${fmt(snapshots.afterToString, b)} ← peak: buf + string (heap ≈ file size)`); + console.log(`After GC : heap ${mb(afterGc.heapUsed)} buf ${mb(afterGc.arrayBuffers)} rss ${mb(afterGc.rss)}`); + + return (snapshots.afterToString.heapUsed + snapshots.afterToString.arrayBuffers) + - (b.heapUsed + b.arrayBuffers); +} + +async function runProposed() { + forceGc(); + const baseline = snapshot(); + const b = { heapUsed: baseline.heapUsed, arrayBuffers: baseline.arrayBuffers }; + + const { result, chunkSnapshots, afterEnd } = await decoderApproach(makeStream()); + + forceGc(); + const afterGc = snapshot(); + + const maxHeapDelta = Math.max(...chunkSnapshots.map(s => s.after.heapUsed - s.before.heapUsed)); + const maxBufDelta = Math.max(...chunkSnapshots.map(s => s.after.arrayBuffers - s.before.arrayBuffers)); + + sep(); + console.log('PROPOSED – StringDecoder, incremental accumulation'); + sep(); + console.log(`Result length : ${mb(result.length)}`); + console.log(`Baseline : heap ${mb(b.heapUsed)} buf ${mb(b.arrayBuffers)} rss ${mb(baseline.rss)}`); + console.log(`Max chunk Δ : Δheap ${sign(maxHeapDelta)} Δbuf ${sign(maxBufDelta)} (per data event, ~1 chunk)`); + console.log(`After 'end' : ${fmt(afterEnd, b)} ← string fully built, no concat buf`); + console.log(`After GC : heap ${mb(afterGc.heapUsed)} buf ${mb(afterGc.arrayBuffers)} rss ${mb(afterGc.rss)}`); + + return (afterEnd.heapUsed + afterEnd.arrayBuffers) - (b.heapUsed + b.arrayBuffers); +} + +// ── Main ────────────────────────────────────────────────────────────────────── +// +// Usage: +// node --expose-gc memory-comparison.js # both (default) +// node --expose-gc memory-comparison.js current +// node --expose-gc memory-comparison.js proposed + +const fmt = (s, b) => + `heap ${mb(s.heapUsed)} buf ${mb(s.arrayBuffers)} rss ${mb(s.rss)}` + + ` (heap ${sign(s.heapUsed - b.heapUsed)} buf ${sign(s.arrayBuffers - b.arrayBuffers)})`; + +(async () => { + const mode = process.argv[2] ?? 'both'; + const valid = ['current', 'proposed', 'both']; + if (!valid.includes(mode)) { + console.error(`Unknown mode "${mode}". Use: current | proposed | both`); + process.exit(1); + } + + const numChunks = Math.ceil((FILE_SIZE_MB * 1024) / CHUNK_SIZE_KB); + console.log(`\nNode.js ${process.version} | mode: ${mode}`); + console.log(`File size : ${FILE_SIZE_MB} MB | Chunk size: ${CHUNK_SIZE_KB} KB | ${numChunks} chunks`); + if (!global.gc) { + console.warn('\nWARNING: run with --expose-gc for accurate baselines\n'); + } + + let currentPeak, proposedPeak; + + if (mode === 'current' || mode === 'both') currentPeak = await runCurrent(); + if (mode === 'proposed' || mode === 'both') proposedPeak = await runProposed(); + + if (mode === 'both') { + const reduction = currentPeak - proposedPeak; + sep(); + console.log('SUMMARY (heap + arrayBuffers combined overhead above each baseline)'); + sep(); + console.log(`Current peak : ${sign(currentPeak)}`); + console.log(`Proposed peak : ${sign(proposedPeak)}`); + console.log(`Reduction : ${sign(reduction)} (${((reduction / currentPeak) * 100).toFixed(1)}%)`); + sep(); + } +})(); diff --git a/utils/utils/src/s3-utils/get-object-s3.ts b/utils/utils/src/s3-utils/get-object-s3.ts index 8d541ed3f..c66d130a6 100644 --- a/utils/utils/src/s3-utils/get-object-s3.ts +++ b/utils/utils/src/s3-utils/get-object-s3.ts @@ -1,4 +1,5 @@ import { type Readable } from 'node:stream'; +import { StringDecoder } from 'node:string_decoder'; import { GetObjectCommand, GetObjectCommandOutput, @@ -36,12 +37,13 @@ export interface S3Location { async function streamToString(Body: Readable) { return new Promise((resolve, reject) => { - const chunks: Buffer[] = []; - Body.on('data', (chunk: ArrayBuffer | SharedArrayBuffer) => - chunks.push(Buffer.from(chunk)), - ); + const decoder = new StringDecoder('utf8'); + let result = ''; + Body.on('data', (chunk: Buffer) => { + result += decoder.write(chunk); + }); Body.on('error', (err) => reject(err)); - Body.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); + Body.on('end', () => resolve(result + decoder.end())); }); } From 129306c49d360c132c33005fa0b169f8183c15a6 Mon Sep 17 00:00:00 2001 From: simonlabarere Date: Thu, 28 May 2026 10:22:36 +0100 Subject: [PATCH 2/3] CCM-18337: Increase PDM Uploader memory --- .../terraform/components/dl/module_lambda_pdm_uploader.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infrastructure/terraform/components/dl/module_lambda_pdm_uploader.tf b/infrastructure/terraform/components/dl/module_lambda_pdm_uploader.tf index 6c4f7e8a4..f9181e330 100644 --- a/infrastructure/terraform/components/dl/module_lambda_pdm_uploader.tf +++ b/infrastructure/terraform/components/dl/module_lambda_pdm_uploader.tf @@ -24,7 +24,7 @@ module "pdm_uploader" { function_include_common = true handler_function_name = "handler" runtime = "nodejs22.x" - memory = 256 + memory = 512 timeout = var.lambda_timeout_seconds log_level = var.log_level From 8e8a1e0fa774f196d75f0f58dfda16775cfee5a7 Mon Sep 17 00:00:00 2001 From: simonlabarere Date: Thu, 28 May 2026 10:26:43 +0100 Subject: [PATCH 3/3] CCM-18337: Delete memory-comparison.js --- memory-comparison.js | 217 ------------------------------------------- 1 file changed, 217 deletions(-) delete mode 100644 memory-comparison.js diff --git a/memory-comparison.js b/memory-comparison.js deleted file mode 100644 index 032f8d429..000000000 --- a/memory-comparison.js +++ /dev/null @@ -1,217 +0,0 @@ -'use strict'; - -/** - * Memory comparison: streamToString approaches - * - * Run with: node --expose-gc memory-comparison.js - * - * --expose-gc enables global.gc() so the heap is fully collected between - * runs, giving a clean baseline for each measurement. - */ - -const { Readable } = require('node:stream'); -const { StringDecoder } = require('node:string_decoder'); - -// ── Configuration ───────────────────────────────────────────────────────────── - -const FILE_SIZE_MB = 6; -const CHUNK_SIZE_KB = 64; // matches Node.js stream default highWaterMark - -// ── Simulated S3 stream ─────────────────────────────────────────────────────── - -// Allocated once outside the runs so it is not counted in per-run measurements. -const SOURCE = Buffer.allocUnsafe(FILE_SIZE_MB * 1024 * 1024).fill(0x41); - -function makeStream() { - const chunkSize = CHUNK_SIZE_KB * 1024; - let offset = 0; - return new Readable({ - read() { - if (offset >= SOURCE.length) { - this.push(null); - return; - } - // subarray shares memory with SOURCE — no copy, matches how the S3 SDK - // emits chunks from a streaming GetObject response. - this.push(SOURCE.subarray(offset, offset + chunkSize)); - offset += chunkSize; - }, - }); -} - -// ── Approach 1: current code ────────────────────────────────────────────────── - -function currentApproach(stream) { - return new Promise((resolve, reject) => { - const chunks = []; - const snapshots = {}; - - stream.on('data', (chunk) => chunks.push(Buffer.from(chunk))); - stream.on('error', reject); - stream.on('end', () => { - // Measure at each synchronous step so we can see the stacked allocations. - // Buffer data bytes live in `arrayBuffers`, not `heapUsed`. - snapshots.beforeConcat = snapshot(); - - const buf = Buffer.concat(chunks); - // chunks[] + buf both live: arrayBuffers should be ~2× file size here. - snapshots.afterConcat = snapshot(); - - const result = buf.toString('utf8'); - // Peak: chunks[] (arrayBuffers) + buf (arrayBuffers) + string (heapUsed). - snapshots.afterToString = snapshot(); - - resolve({ result, snapshots }); - }); - }); -} - -// ── Approach 2: StringDecoder ───────────────────────────────────────────────── - -function decoderApproach(stream) { - return new Promise((resolve, reject) => { - const decoder = new StringDecoder('utf8'); - let accumulated = ''; - const chunkSnapshots = []; - - stream.on('data', (chunk) => { - const before = snapshot(); - accumulated += decoder.write(chunk); - const after = snapshot(); - chunkSnapshots.push({ before, after }); - }); - stream.on('error', reject); - stream.on('end', () => { - const result = accumulated + decoder.end(); - const afterEnd = snapshot(); - resolve({ result, chunkSnapshots, afterEnd }); - }); - }); -} - -// ── Helpers ─────────────────────────────────────────────────────────────────── - -const mb = (bytes) => `${(bytes / 1024 / 1024).toFixed(2)} MB`; -const sign = (bytes) => `${bytes >= 0 ? '+' : ''}${mb(bytes)}`; -const sep = () => console.log('─'.repeat(62)); - -/** - * Returns the three memory fields relevant to this comparison: - * - * heapUsed – V8 object heap (JS strings, arrays, closures, etc.) - * arrayBuffers – backing store for ArrayBuffer / SharedArrayBuffer / Buffer - * data bytes (lives OUTSIDE the V8 heap) - * rss – total resident set (catches anything the above misses) - * - * Buffer data bytes sit in `arrayBuffers`, not `heapUsed`. - * String data bytes sit in `heapUsed`. - * Both must be tracked to see the full picture. - */ -function snapshot() { - const m = process.memoryUsage(); - return { heapUsed: m.heapUsed, arrayBuffers: m.arrayBuffers, rss: m.rss }; -} - -function forceGc() { - if (global.gc) { - global.gc(); - global.gc(); // second pass collects objects promoted after the first - } -} - -// ── Runners ─────────────────────────────────────────────────────────────────── - -// Each runner is a self-contained async function so the result string goes out -// of scope when it returns, letting GC collect it before the next run starts. - -async function runCurrent() { - forceGc(); - const baseline = snapshot(); - const b = { heapUsed: baseline.heapUsed, arrayBuffers: baseline.arrayBuffers }; - - const { result, snapshots } = await currentApproach(makeStream()); - - forceGc(); - const afterGc = snapshot(); - - sep(); - console.log('CURRENT – chunks[] + Buffer.concat + .toString()'); - sep(); - console.log(`Result length : ${mb(result.length)}`); - console.log(`Baseline : heap ${mb(b.heapUsed)} buf ${mb(b.arrayBuffers)} rss ${mb(baseline.rss)}`); - console.log(`Before concat : ${fmt(snapshots.beforeConcat, b)} ← chunks[] populated (buf ≈ file size)`); - console.log(`After concat : ${fmt(snapshots.afterConcat, b)} ← chunks[] + concat buf (buf ≈ 2× file)`); - console.log(`After toString: ${fmt(snapshots.afterToString, b)} ← peak: buf + string (heap ≈ file size)`); - console.log(`After GC : heap ${mb(afterGc.heapUsed)} buf ${mb(afterGc.arrayBuffers)} rss ${mb(afterGc.rss)}`); - - return (snapshots.afterToString.heapUsed + snapshots.afterToString.arrayBuffers) - - (b.heapUsed + b.arrayBuffers); -} - -async function runProposed() { - forceGc(); - const baseline = snapshot(); - const b = { heapUsed: baseline.heapUsed, arrayBuffers: baseline.arrayBuffers }; - - const { result, chunkSnapshots, afterEnd } = await decoderApproach(makeStream()); - - forceGc(); - const afterGc = snapshot(); - - const maxHeapDelta = Math.max(...chunkSnapshots.map(s => s.after.heapUsed - s.before.heapUsed)); - const maxBufDelta = Math.max(...chunkSnapshots.map(s => s.after.arrayBuffers - s.before.arrayBuffers)); - - sep(); - console.log('PROPOSED – StringDecoder, incremental accumulation'); - sep(); - console.log(`Result length : ${mb(result.length)}`); - console.log(`Baseline : heap ${mb(b.heapUsed)} buf ${mb(b.arrayBuffers)} rss ${mb(baseline.rss)}`); - console.log(`Max chunk Δ : Δheap ${sign(maxHeapDelta)} Δbuf ${sign(maxBufDelta)} (per data event, ~1 chunk)`); - console.log(`After 'end' : ${fmt(afterEnd, b)} ← string fully built, no concat buf`); - console.log(`After GC : heap ${mb(afterGc.heapUsed)} buf ${mb(afterGc.arrayBuffers)} rss ${mb(afterGc.rss)}`); - - return (afterEnd.heapUsed + afterEnd.arrayBuffers) - (b.heapUsed + b.arrayBuffers); -} - -// ── Main ────────────────────────────────────────────────────────────────────── -// -// Usage: -// node --expose-gc memory-comparison.js # both (default) -// node --expose-gc memory-comparison.js current -// node --expose-gc memory-comparison.js proposed - -const fmt = (s, b) => - `heap ${mb(s.heapUsed)} buf ${mb(s.arrayBuffers)} rss ${mb(s.rss)}` + - ` (heap ${sign(s.heapUsed - b.heapUsed)} buf ${sign(s.arrayBuffers - b.arrayBuffers)})`; - -(async () => { - const mode = process.argv[2] ?? 'both'; - const valid = ['current', 'proposed', 'both']; - if (!valid.includes(mode)) { - console.error(`Unknown mode "${mode}". Use: current | proposed | both`); - process.exit(1); - } - - const numChunks = Math.ceil((FILE_SIZE_MB * 1024) / CHUNK_SIZE_KB); - console.log(`\nNode.js ${process.version} | mode: ${mode}`); - console.log(`File size : ${FILE_SIZE_MB} MB | Chunk size: ${CHUNK_SIZE_KB} KB | ${numChunks} chunks`); - if (!global.gc) { - console.warn('\nWARNING: run with --expose-gc for accurate baselines\n'); - } - - let currentPeak, proposedPeak; - - if (mode === 'current' || mode === 'both') currentPeak = await runCurrent(); - if (mode === 'proposed' || mode === 'both') proposedPeak = await runProposed(); - - if (mode === 'both') { - const reduction = currentPeak - proposedPeak; - sep(); - console.log('SUMMARY (heap + arrayBuffers combined overhead above each baseline)'); - sep(); - console.log(`Current peak : ${sign(currentPeak)}`); - console.log(`Proposed peak : ${sign(proposedPeak)}`); - console.log(`Reduction : ${sign(reduction)} (${((reduction / currentPeak) * 100).toFixed(1)}%)`); - sep(); - } -})();