From 65370704bd76355529fcd922c26d3c2fbd1d54aa Mon Sep 17 00:00:00 2001 From: Pratap2018 Date: Wed, 20 May 2026 12:13:48 +0530 Subject: [PATCH 1/2] Dead Letter Queue --- src/index.js | 75 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 11 deletions(-) diff --git a/src/index.js b/src/index.js index 82fb9da..72914ab 100644 --- a/src/index.js +++ b/src/index.js @@ -181,12 +181,62 @@ const deploy = async (name, // console.log(data.body.status.phase); } catch (err) { log('error', err); + throw err; // let the consumer catch block handle it } }; const queueName = process.env.GLOBAL_TXN_CONTROLLER_QUEUE || 'GLOBAL_TXN_CONTROLLER_QUEUE'; +const dlqName = process.env.GLOBAL_TXN_CONTROLLER_DLQ || 'GLOBAL_TXN_CONTROLLER_DLQ'; +const MAX_DLQ_RETRIES = parseInt(process.env.MAX_DLQ_RETRIES || '5'); +const DLQ_DRAIN_INTERVAL_MS = parseInt(process.env.DLQ_DRAIN_INTERVAL_MS || '300000'); // 5 min + +// Send message to DLQ, preserving original content. Tracks retry count in headers. +const sendToDLQ = (channel, message, errorReason) => { + const retryCount = message.properties.headers?.['x-dlq-retry-count'] || 0; + if (retryCount >= MAX_DLQ_RETRIES) { + log('error', `Message permanently discarded after ${MAX_DLQ_RETRIES} DLQ retries. Reason: ${errorReason}`); + return; + } + channel.sendToQueue(dlqName, message.content, { + persistent: true, + headers: { + ...message.properties.headers, + 'x-dlq-retry-count': retryCount + 1, + 'x-dlq-reason': String(errorReason).slice(0, 500), + 'x-dlq-entered-at': new Date().toISOString() + } + }); + log('warn', `Message sent to DLQ (attempt ${retryCount + 1}/${MAX_DLQ_RETRIES}): ${errorReason}`); +}; + +// Drain DLQ by republishing messages back to the main queue in their original format. +const drainDLQ = async (channel) => { + let count = 0; + try { + while (true) { + const msg = await channel.get(dlqName, { noAck: false }); + if (!msg) break; + const retryCount = msg.properties.headers?.['x-dlq-retry-count'] || 0; + if (retryCount >= MAX_DLQ_RETRIES) { + log('error', `Permanently discarding DLQ message after ${retryCount} retries`); + channel.ack(msg); + continue; + } + // Republish original content back to main queue — consumer will process it normally + channel.sendToQueue(queueName, msg.content, { + persistent: false, + headers: msg.properties.headers + }); + channel.ack(msg); + count++; + } + } catch (err) { + log('error', `DLQ drain error: ${err.message}`); + } + if (count > 0) log('info', `DLQ drained: ${count} message(s) republished to main queue`); +}; (async () => { try { @@ -198,10 +248,10 @@ const queueName = process.env.GLOBAL_TXN_CONTROLLER_QUEUE || 'GLOBAL_TXN_CONTROL heartbeat: 30 }) const channel = await connection.createChannel(); - await channel.assertQueue(queueName, { - durable: false, - - }) + await channel.assertQueue(queueName, { durable: false }); + await channel.assertQueue(dlqName, { durable: true }); + const drainInterval = setInterval(() => drainDLQ(channel), DLQ_DRAIN_INTERVAL_MS); + log('info', `DLQ drain scheduled every ${DLQ_DRAIN_INTERVAL_MS / 1000}s`); await channel.consume(queueName, async (message) => { let queueMsg; log('debug', 'Trying to consume') @@ -241,18 +291,21 @@ const queueName = process.env.GLOBAL_TXN_CONTROLLER_QUEUE || 'GLOBAL_TXN_CONTROL } catch (error) { log('error', error.message); - channel.nack(message, false, false) - + sendToDLQ(channel, message, error.message); + channel.ack(message); } }) - process.on('SIGINT', async () => { - log('info', 'Closing RabbitMQ connection...'); - await channel.close(); - await connection.close(); + const shutdown = async (signal) => { + log('info', `${signal} received, shutting down gracefully...`); + clearInterval(drainInterval); + try { await channel.close(); } catch (_) {} + try { await connection.close(); } catch (_) {} process.exit(0); - }); + }; + process.on('SIGTERM', () => shutdown('SIGTERM')); // k8s sends this + process.on('SIGINT', () => shutdown('SIGINT')); // local dev Ctrl+C } catch (error) { log('error', error.message) } From adcb88bbfae8fc3b2a0711f52fa257f8f1958073 Mon Sep 17 00:00:00 2001 From: Pratap2018 Date: Fri, 22 May 2026 10:20:58 +0530 Subject: [PATCH 2/2] [CHORE]: Enhance DLQ handling with error logging and channel state checks --- src/index.js | 41 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/src/index.js b/src/index.js index 72914ab..6c797f6 100644 --- a/src/index.js +++ b/src/index.js @@ -247,10 +247,41 @@ const drainDLQ = async (channel) => { const connection = await amqp.connect(process.env.AMQ_URL, { heartbeat: 30 }) + + let channelOpen = true; + + connection.on('error', (err) => { + log('error', `AMQP connection error: ${err.message}`); + }); + connection.on('close', () => { + log('error', 'AMQP connection closed unexpectedly. Exiting for restart...'); + channelOpen = false; + clearInterval(drainInterval); + process.exit(1); + }); + const channel = await connection.createChannel(); + + channel.on('error', (err) => { + log('error', `AMQP channel error: ${err.message}`); + channelOpen = false; + }); + channel.on('close', () => { + log('error', 'AMQP channel closed. Exiting for restart...'); + channelOpen = false; + clearInterval(drainInterval); + process.exit(1); + }); + await channel.assertQueue(queueName, { durable: false }); await channel.assertQueue(dlqName, { durable: true }); - const drainInterval = setInterval(() => drainDLQ(channel), DLQ_DRAIN_INTERVAL_MS); + const drainInterval = setInterval(() => { + if (!channelOpen) { + log('warn', 'Skipping DLQ drain: channel is not open'); + return; + } + drainDLQ(channel); + }, DLQ_DRAIN_INTERVAL_MS); log('info', `DLQ drain scheduled every ${DLQ_DRAIN_INTERVAL_MS / 1000}s`); await channel.consume(queueName, async (message) => { let queueMsg; @@ -291,8 +322,12 @@ const drainDLQ = async (channel) => { } catch (error) { log('error', error.message); - sendToDLQ(channel, message, error.message); - channel.ack(message); + if (channelOpen) { + sendToDLQ(channel, message, error.message); + channel.ack(message); + } else { + log('warn', 'Channel closed during message processing; message will be requeued on restart'); + } } })