Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions test/base-queue-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ describe('Test baseQueueHandler', function () {
DemoHandler.prototype.afterDlq = originalAfterDlq;
});

it('should add raw buffer to dlq when afterDlq throws to prevent double-encoding', async function () {
it('should not duplicate dlq message when afterDlq throws after successful publish', async function () {
const handler = new DemoHandler(this.name, rabbit, {
retries: 2,
retries: 1,
retryDelay: 10
});
await handler.created;
Expand All @@ -207,19 +207,49 @@ describe('Test baseQueueHandler', function () {

await rabbit.publish(this.name, { test: 'data' }, { correlationId: '3' });
const publish = sandbox.spy(handler.rabbit, 'publish');
await new Promise(resolve => setTimeout(resolve, 300));
await new Promise(resolve => setTimeout(resolve, 100));

afterDlq.calledOnce.should.be.true();
publish.calledOnce.should.be.true();
dlqMessages.length.should.equal(1);
dlqMessages[0].event.should.eql({ test: 'data' });
});

it('should publish raw buffer to dlq when first publish fails', async function () {
const handler = new DemoHandler(this.name, rabbit, {
retries: 1,
retryDelay: 10
});
await handler.created;
handler.handle = sandbox.spy(() => {
throw new Error('test error');
});
const afterDlq = (handler.afterDlq = sandbox.spy());

const dlqMessages: any[] = [];
await rabbit.subscribe(this.name + '_dlq', (msg, ack) => {
dlqMessages.push({ event: JSON.parse(msg.content.toString()), content: msg.content });
ack(null);
});

await rabbit.publish(this.name, { test: 'data' }, { correlationId: '5' });
const originalPublish = handler.rabbit.publish.bind(handler.rabbit);
let publishCallCount = 0;
const publish = sandbox.stub(handler.rabbit, 'publish').callsFake((...args) => {
publishCallCount++;
if (publishCallCount === 1) {
return Promise.reject(new Error('broker rejected'));
}
return originalPublish(...args);
});
await new Promise(resolve => setTimeout(resolve, 100));

publish.calledTwice.should.be.true();
const fallbackPayload = publish.args[publish.callCount - 1][1];
const fallbackPayload = publish.args[1][1];
Buffer.isBuffer(fallbackPayload).should.be.true();
fallbackPayload.toString().should.eql('{"test":"data"}');

// TODO: This flow also produces duplicate messages on the DLQ,
// to be investigated and handled on the next PR
dlqMessages.length.should.equal(2);
dlqMessages.length.should.equal(1);
dlqMessages[0].event.should.eql({ test: 'data' });
dlqMessages[1].event.should.eql({ test: 'data' });
});

it('should add to dlq after x retries and get error response even if afterDlq throws error', async function () {
Expand Down
11 changes: 9 additions & 2 deletions ts/base-queue-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,22 @@ abstract class BaseQueueHandler {

async addToDLQ(retries, msg: amqp.Message, ack) {
const correlationId = this.getCorrelationId(msg);
let dlqPublished = false;

try {
const event = decode(msg);
this.logger.warn(`[${correlationId}] Adding to dlq: ${this.dlqName} after ${retries} retries`);
await this.rabbit.publish(this.dlqName, event, msg.properties);
dlqPublished = true;
const response = await this.afterDlq({ msg, event });
ack(msg.properties.headers.errors.message, response);
} catch (err) {
this.logger.error(`[${correlationId}] Failed to add to dlq: ${this.dlqName}`, err);
await this.rabbit.publish(this.dlqName, msg.content, msg.properties);
if (dlqPublished) {
this.logger.error(`[${correlationId}] afterDlq failed after publishing to dlq: ${this.dlqName}`, err);
} else {
this.logger.error(`[${correlationId}] Failed to add to dlq: ${this.dlqName}`, err);
await this.rabbit.publish(this.dlqName, msg.content, msg.properties);
}
ack(err.message, null);
}
}
Expand Down