diff --git a/test/base-queue-handler.test.ts b/test/base-queue-handler.test.ts index e2c37d2..0a72dd8 100644 --- a/test/base-queue-handler.test.ts +++ b/test/base-queue-handler.test.ts @@ -183,9 +183,9 @@ describe('Test baseQueueHandler', function () { DemoHandler.prototype.afterDlq = originalAfterDlq; }); - it('should add raw buffer to dlq when afterDlq throws to prevent double-encoding', async function () { + it('should not duplicate dlq message when afterDlq throws after successful publish', async function () { const handler = new DemoHandler(this.name, rabbit, { - retries: 2, + retries: 1, retryDelay: 10 }); await handler.created; @@ -207,19 +207,49 @@ describe('Test baseQueueHandler', function () { await rabbit.publish(this.name, { test: 'data' }, { correlationId: '3' }); const publish = sandbox.spy(handler.rabbit, 'publish'); - await new Promise(resolve => setTimeout(resolve, 300)); + await new Promise(resolve => setTimeout(resolve, 100)); afterDlq.calledOnce.should.be.true(); + publish.calledOnce.should.be.true(); + dlqMessages.length.should.equal(1); + dlqMessages[0].event.should.eql({ test: 'data' }); + }); + + it('should publish raw buffer to dlq when first publish fails', async function () { + const handler = new DemoHandler(this.name, rabbit, { + retries: 1, + retryDelay: 10 + }); + await handler.created; + handler.handle = sandbox.spy(() => { + throw new Error('test error'); + }); + const afterDlq = (handler.afterDlq = sandbox.spy()); + + const dlqMessages: any[] = []; + await rabbit.subscribe(this.name + '_dlq', (msg, ack) => { + dlqMessages.push({ event: JSON.parse(msg.content.toString()), content: msg.content }); + ack(null); + }); + + await rabbit.publish(this.name, { test: 'data' }, { correlationId: '5' }); + const originalPublish = handler.rabbit.publish.bind(handler.rabbit); + let publishCallCount = 0; + const publish = sandbox.stub(handler.rabbit, 'publish').callsFake((...args) => { + publishCallCount++; + if (publishCallCount === 1) { + return Promise.reject(new Error('broker rejected')); + } + return originalPublish(...args); + }); + await new Promise(resolve => setTimeout(resolve, 100)); + publish.calledTwice.should.be.true(); - const fallbackPayload = publish.args[publish.callCount - 1][1]; + const fallbackPayload = publish.args[1][1]; Buffer.isBuffer(fallbackPayload).should.be.true(); fallbackPayload.toString().should.eql('{"test":"data"}'); - - // TODO: This flow also produces duplicate messages on the DLQ, - // to be investigated and handled on the next PR - dlqMessages.length.should.equal(2); + dlqMessages.length.should.equal(1); dlqMessages[0].event.should.eql({ test: 'data' }); - dlqMessages[1].event.should.eql({ test: 'data' }); }); it('should add to dlq after x retries and get error response even if afterDlq throws error', async function () { diff --git a/ts/base-queue-handler.ts b/ts/base-queue-handler.ts index 68c4fcd..6458a58 100644 --- a/ts/base-queue-handler.ts +++ b/ts/base-queue-handler.ts @@ -183,15 +183,22 @@ abstract class BaseQueueHandler { async addToDLQ(retries, msg: amqp.Message, ack) { const correlationId = this.getCorrelationId(msg); + let dlqPublished = false; + try { const event = decode(msg); this.logger.warn(`[${correlationId}] Adding to dlq: ${this.dlqName} after ${retries} retries`); await this.rabbit.publish(this.dlqName, event, msg.properties); + dlqPublished = true; const response = await this.afterDlq({ msg, event }); ack(msg.properties.headers.errors.message, response); } catch (err) { - this.logger.error(`[${correlationId}] Failed to add to dlq: ${this.dlqName}`, err); - await this.rabbit.publish(this.dlqName, msg.content, msg.properties); + if (dlqPublished) { + this.logger.error(`[${correlationId}] afterDlq failed after publishing to dlq: ${this.dlqName}`, err); + } else { + this.logger.error(`[${correlationId}] Failed to add to dlq: ${this.dlqName}`, err); + await this.rabbit.publish(this.dlqName, msg.content, msg.properties); + } ack(err.message, null); } }