Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 98 additions & 10 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,62 @@ const deploy = async (name,
// console.log(data.body.status.phase);
} catch (err) {
log('error', err);
throw err; // let the consumer catch block handle it
}
};



const queueName = process.env.GLOBAL_TXN_CONTROLLER_QUEUE || 'GLOBAL_TXN_CONTROLLER_QUEUE';
const dlqName = process.env.GLOBAL_TXN_CONTROLLER_DLQ || 'GLOBAL_TXN_CONTROLLER_DLQ';
const MAX_DLQ_RETRIES = parseInt(process.env.MAX_DLQ_RETRIES || '5');
const DLQ_DRAIN_INTERVAL_MS = parseInt(process.env.DLQ_DRAIN_INTERVAL_MS || '300000'); // 5 min

// Send message to DLQ, preserving original content. Tracks retry count in headers.
const sendToDLQ = (channel, message, errorReason) => {
const retryCount = message.properties.headers?.['x-dlq-retry-count'] || 0;
if (retryCount >= MAX_DLQ_RETRIES) {
log('error', `Message permanently discarded after ${MAX_DLQ_RETRIES} DLQ retries. Reason: ${errorReason}`);
return;
}
channel.sendToQueue(dlqName, message.content, {
persistent: true,
headers: {
...message.properties.headers,
'x-dlq-retry-count': retryCount + 1,
'x-dlq-reason': String(errorReason).slice(0, 500),
'x-dlq-entered-at': new Date().toISOString()
}
});
log('warn', `Message sent to DLQ (attempt ${retryCount + 1}/${MAX_DLQ_RETRIES}): ${errorReason}`);
};

// Drain DLQ by republishing messages back to the main queue in their original format.
const drainDLQ = async (channel) => {
let count = 0;
try {
while (true) {
const msg = await channel.get(dlqName, { noAck: false });
if (!msg) break;
const retryCount = msg.properties.headers?.['x-dlq-retry-count'] || 0;
if (retryCount >= MAX_DLQ_RETRIES) {
log('error', `Permanently discarding DLQ message after ${retryCount} retries`);
channel.ack(msg);
continue;
}
// Republish original content back to main queue — consumer will process it normally
channel.sendToQueue(queueName, msg.content, {
persistent: false,
headers: msg.properties.headers
});
channel.ack(msg);
count++;
}
} catch (err) {
log('error', `DLQ drain error: ${err.message}`);
}
if (count > 0) log('info', `DLQ drained: ${count} message(s) republished to main queue`);
};

(async () => {
try {
Expand All @@ -197,11 +247,42 @@ const queueName = process.env.GLOBAL_TXN_CONTROLLER_QUEUE || 'GLOBAL_TXN_CONTROL
const connection = await amqp.connect(process.env.AMQ_URL, {
heartbeat: 30
})

let channelOpen = true;

connection.on('error', (err) => {
log('error', `AMQP connection error: ${err.message}`);
});
connection.on('close', () => {
log('error', 'AMQP connection closed unexpectedly. Exiting for restart...');
channelOpen = false;
clearInterval(drainInterval);
process.exit(1);
});

const channel = await connection.createChannel();
await channel.assertQueue(queueName, {
durable: false,

})
channel.on('error', (err) => {
log('error', `AMQP channel error: ${err.message}`);
channelOpen = false;
});
channel.on('close', () => {
log('error', 'AMQP channel closed. Exiting for restart...');
channelOpen = false;
clearInterval(drainInterval);
process.exit(1);
});

await channel.assertQueue(queueName, { durable: false });
await channel.assertQueue(dlqName, { durable: true });
const drainInterval = setInterval(() => {
if (!channelOpen) {
log('warn', 'Skipping DLQ drain: channel is not open');
return;
}
drainDLQ(channel);
}, DLQ_DRAIN_INTERVAL_MS);
log('info', `DLQ drain scheduled every ${DLQ_DRAIN_INTERVAL_MS / 1000}s`);
await channel.consume(queueName, async (message) => {
let queueMsg;
log('debug', 'Trying to consume')
Expand Down Expand Up @@ -241,18 +322,25 @@ const queueName = process.env.GLOBAL_TXN_CONTROLLER_QUEUE || 'GLOBAL_TXN_CONTROL

} catch (error) {
log('error', error.message);
channel.nack(message, false, false)

if (channelOpen) {
sendToDLQ(channel, message, error.message);
channel.ack(message);
} else {
log('warn', 'Channel closed during message processing; message will be requeued on restart');
}
}

})

process.on('SIGINT', async () => {
log('info', 'Closing RabbitMQ connection...');
await channel.close();
await connection.close();
const shutdown = async (signal) => {
log('info', `${signal} received, shutting down gracefully...`);
clearInterval(drainInterval);
try { await channel.close(); } catch (_) {}
try { await connection.close(); } catch (_) {}
process.exit(0);
});
};
process.on('SIGTERM', () => shutdown('SIGTERM')); // k8s sends this
process.on('SIGINT', () => shutdown('SIGINT')); // local dev Ctrl+C
} catch (error) {
log('error', error.message)
}
Expand Down
Loading