Skip to content

CCM-18337: Fix PDM Out Of Memory error#378

Merged
simonlabarere merged 4 commits into
mainfrom
feature/CCM-18337_oom_pdm
Jun 1, 2026
Merged

CCM-18337: Fix PDM Out Of Memory error#378
simonlabarere merged 4 commits into
mainfrom
feature/CCM-18337_oom_pdm

Conversation

@simonlabarere
Copy link
Copy Markdown
Contributor

@simonlabarere simonlabarere commented May 28, 2026

Description

Changes:

  • Doubled memory allocated to PDM Uploader (256 MB -> 512 MB).
  • Refactored streamToString to avoid holding all three copies at once, evidence how that saves memory below.

Memory comparison test (not committed):

'use strict';

/**
 * Memory comparison: streamToString approaches
 *
 * Run with:  node --expose-gc memory-comparison.js
 *
 * --expose-gc enables global.gc() so the heap is fully collected between
 * runs, giving a clean baseline for each measurement.
 */

const { Readable } = require('node:stream');
const { StringDecoder } = require('node:string_decoder');

// ── Configuration ─────────────────────────────────────────────────────────────

const FILE_SIZE_MB = 6;
const CHUNK_SIZE_KB = 64; // matches Node.js stream default highWaterMark

// ── Simulated S3 stream ───────────────────────────────────────────────────────

// Allocated once outside the runs so it is not counted in per-run measurements.
const SOURCE = Buffer.allocUnsafe(FILE_SIZE_MB * 1024 * 1024).fill(0x41);

function makeStream() {
  const chunkSize = CHUNK_SIZE_KB * 1024;
  let offset = 0;
  return new Readable({
    read() {
      if (offset >= SOURCE.length) {
        this.push(null);
        return;
      }
      // subarray shares memory with SOURCE — no copy, matches how the S3 SDK
      // emits chunks from a streaming GetObject response.
      this.push(SOURCE.subarray(offset, offset + chunkSize));
      offset += chunkSize;
    },
  });
}

// ── Approach 1: current code ──────────────────────────────────────────────────

function currentApproach(stream) {
  return new Promise((resolve, reject) => {
    const chunks = [];
    const snapshots = {};

    stream.on('data', (chunk) => chunks.push(Buffer.from(chunk)));
    stream.on('error', reject);
    stream.on('end', () => {
      // Measure at each synchronous step so we can see the stacked allocations.
      // Buffer data bytes live in `arrayBuffers`, not `heapUsed`.
      snapshots.beforeConcat = snapshot();

      const buf = Buffer.concat(chunks);
      // chunks[] + buf both live: arrayBuffers should be ~2× file size here.
      snapshots.afterConcat = snapshot();

      const result = buf.toString('utf8');
      // Peak: chunks[] (arrayBuffers) + buf (arrayBuffers) + string (heapUsed).
      snapshots.afterToString = snapshot();

      resolve({ result, snapshots });
    });
  });
}

// ── Approach 2: StringDecoder ─────────────────────────────────────────────────

function decoderApproach(stream) {
  return new Promise((resolve, reject) => {
    const decoder = new StringDecoder('utf8');
    let accumulated = '';
    const chunkSnapshots = [];

    stream.on('data', (chunk) => {
      const before = snapshot();
      accumulated += decoder.write(chunk);
      const after = snapshot();
      chunkSnapshots.push({ before, after });
    });
    stream.on('error', reject);
    stream.on('end', () => {
      const result = accumulated + decoder.end();
      const afterEnd = snapshot();
      resolve({ result, chunkSnapshots, afterEnd });
    });
  });
}

// ── Helpers ───────────────────────────────────────────────────────────────────

const mb = (bytes) => `${(bytes / 1024 / 1024).toFixed(2)} MB`;
const sign = (bytes) => `${bytes >= 0 ? '+' : ''}${mb(bytes)}`;
const sep = () => console.log('─'.repeat(62));

/**
 * Returns the three memory fields relevant to this comparison:
 *
 *   heapUsed     – V8 object heap (JS strings, arrays, closures, etc.)
 *   arrayBuffers – backing store for ArrayBuffer / SharedArrayBuffer / Buffer
 *                  data bytes (lives OUTSIDE the V8 heap)
 *   rss          – total resident set (catches anything the above misses)
 *
 * Buffer data bytes sit in `arrayBuffers`, not `heapUsed`.
 * String data bytes sit in `heapUsed`.
 * Both must be tracked to see the full picture.
 */
function snapshot() {
  const m = process.memoryUsage();
  return { heapUsed: m.heapUsed, arrayBuffers: m.arrayBuffers, rss: m.rss };
}

function forceGc() {
  if (global.gc) {
    global.gc();
    global.gc(); // second pass collects objects promoted after the first
  }
}

// ── Runners ───────────────────────────────────────────────────────────────────

// Each runner is a self-contained async function so the result string goes out
// of scope when it returns, letting GC collect it before the next run starts.

async function runCurrent() {
  forceGc();
  const baseline = snapshot();
  const b = { heapUsed: baseline.heapUsed, arrayBuffers: baseline.arrayBuffers };

  const { result, snapshots } = await currentApproach(makeStream());

  forceGc();
  const afterGc = snapshot();

  sep();
  console.log('CURRENT  –  chunks[] + Buffer.concat + .toString()');
  sep();
  console.log(`Result length  : ${mb(result.length)}`);
  console.log(`Baseline       : heap ${mb(b.heapUsed)}  buf ${mb(b.arrayBuffers)}  rss ${mb(baseline.rss)}`);
  console.log(`Before concat  : ${fmt(snapshots.beforeConcat,  b)}  ← chunks[] populated (buf ≈ file size)`);
  console.log(`After  concat  : ${fmt(snapshots.afterConcat,   b)}  ← chunks[] + concat buf (buf ≈ 2× file)`);
  console.log(`After  toString: ${fmt(snapshots.afterToString, b)}  ← peak: buf + string (heap ≈ file size)`);
  console.log(`After  GC      : heap ${mb(afterGc.heapUsed)}  buf ${mb(afterGc.arrayBuffers)}  rss ${mb(afterGc.rss)}`);

  return (snapshots.afterToString.heapUsed + snapshots.afterToString.arrayBuffers)
    - (b.heapUsed + b.arrayBuffers);
}

async function runProposed() {
  forceGc();
  const baseline = snapshot();
  const b = { heapUsed: baseline.heapUsed, arrayBuffers: baseline.arrayBuffers };

  const { result, chunkSnapshots, afterEnd } = await decoderApproach(makeStream());

  forceGc();
  const afterGc = snapshot();

  const maxHeapDelta = Math.max(...chunkSnapshots.map(s => s.after.heapUsed     - s.before.heapUsed));
  const maxBufDelta  = Math.max(...chunkSnapshots.map(s => s.after.arrayBuffers - s.before.arrayBuffers));

  sep();
  console.log('PROPOSED –  StringDecoder, incremental accumulation');
  sep();
  console.log(`Result length  : ${mb(result.length)}`);
  console.log(`Baseline       : heap ${mb(b.heapUsed)}  buf ${mb(b.arrayBuffers)}  rss ${mb(baseline.rss)}`);
  console.log(`Max chunk Δ    : Δheap ${sign(maxHeapDelta)}  Δbuf ${sign(maxBufDelta)}  (per data event, ~1 chunk)`);
  console.log(`After 'end'    : ${fmt(afterEnd, b)}  ← string fully built, no concat buf`);
  console.log(`After  GC      : heap ${mb(afterGc.heapUsed)}  buf ${mb(afterGc.arrayBuffers)}  rss ${mb(afterGc.rss)}`);

  return (afterEnd.heapUsed + afterEnd.arrayBuffers) - (b.heapUsed + b.arrayBuffers);
}

// ── Main ──────────────────────────────────────────────────────────────────────
//
// Usage:
//   node --expose-gc memory-comparison.js           # both (default)
//   node --expose-gc memory-comparison.js current
//   node --expose-gc memory-comparison.js proposed

const fmt = (s, b) =>
  `heap ${mb(s.heapUsed)}  buf ${mb(s.arrayBuffers)}  rss ${mb(s.rss)}` +
  `  (heap ${sign(s.heapUsed - b.heapUsed)}  buf ${sign(s.arrayBuffers - b.arrayBuffers)})`;

(async () => {
  const mode = process.argv[2] ?? 'both';
  const valid = ['current', 'proposed', 'both'];
  if (!valid.includes(mode)) {
    console.error(`Unknown mode "${mode}". Use: current | proposed | both`);
    process.exit(1);
  }

  const numChunks = Math.ceil((FILE_SIZE_MB * 1024) / CHUNK_SIZE_KB);
  console.log(`\nNode.js ${process.version}  |  mode: ${mode}`);
  console.log(`File size : ${FILE_SIZE_MB} MB  |  Chunk size: ${CHUNK_SIZE_KB} KB  |  ${numChunks} chunks`);
  if (!global.gc) {
    console.warn('\nWARNING: run with --expose-gc for accurate baselines\n');
  }

  let currentPeak, proposedPeak;

  if (mode === 'current' || mode === 'both')  currentPeak  = await runCurrent();
  if (mode === 'proposed' || mode === 'both') proposedPeak = await runProposed();

  if (mode === 'both') {
    const reduction = currentPeak - proposedPeak;
    sep();
    console.log('SUMMARY  (heap + arrayBuffers combined overhead above each baseline)');
    sep();
    console.log(`Current  peak : ${sign(currentPeak)}`);
    console.log(`Proposed peak : ${sign(proposedPeak)}`);
    console.log(`Reduction     : ${sign(reduction)}  (${((reduction / currentPeak) * 100).toFixed(1)}%)`);
    sep();
  }
})();

Output:
image

Tested in NFT:
image

https://nhsd-confluence.digital.nhs.uk/spaces/RIS/pages/1386363591/28-05-2026+CCM-18337

Type of changes

  • Refactoring (non-breaking change)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would change existing functionality)
  • Bug fix (non-breaking change which fixes an issue)

Checklist

  • I am familiar with the contributing guidelines
  • I have followed the code style of the project
  • I have added tests to cover my changes
  • I have updated the documentation accordingly
  • This PR is a result of pair or mob programming

Sensitive Information Declaration

To ensure the utmost confidentiality and protect your and others privacy, we kindly ask you to NOT including PII (Personal Identifiable Information) / PID (Personal Identifiable Data) or any other sensitive data in this PR (Pull Request) and the codebase changes. We will remove any PR that do contain any sensitive information. We really appreciate your cooperation in this matter.

  • I confirm that neither PII/PID nor sensitive data are included in this PR and the codebase changes.

@simonlabarere simonlabarere requested review from a team as code owners May 28, 2026 09:45
@simonlabarere simonlabarere merged commit aef9bb1 into main Jun 1, 2026
120 of 123 checks passed
@simonlabarere simonlabarere deleted the feature/CCM-18337_oom_pdm branch June 1, 2026 09:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants