From 8762643df4971051cd5151e731047dbd15acbe8a Mon Sep 17 00:00:00 2001 From: janosbabik Date: Mon, 27 Apr 2026 13:00:26 +0200 Subject: [PATCH] feat: update message-broker to version 1.9.0 and add prefetch option to listenOn method --- package-lock.json | 2 +- packages/message-broker/package.json | 2 +- packages/message-broker/src/consumer.ts | 6 ++++-- packages/message-broker/src/index.spec.ts | 20 ++++++++++++++++++++ packages/message-broker/src/index.ts | 22 +++++++++++++++++++--- 5 files changed, 45 insertions(+), 7 deletions(-) diff --git a/package-lock.json b/package-lock.json index 2c6ec43..8775b90 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18788,7 +18788,7 @@ }, "packages/message-broker": { "name": "@user-office-software/duo-message-broker", - "version": "1.8.0", + "version": "1.9.0", "license": "ISC", "dependencies": { "@types/amqplib": "^0.10.1", diff --git a/packages/message-broker/package.json b/packages/message-broker/package.json index 9b26d93..47b8aa0 100644 --- a/packages/message-broker/package.json +++ b/packages/message-broker/package.json @@ -1,6 +1,6 @@ { "name": "@user-office-software/duo-message-broker", - "version": "1.8.0", + "version": "1.9.0", "description": "", "author": "SWAP", "license": "ISC", diff --git a/packages/message-broker/src/consumer.ts b/packages/message-broker/src/consumer.ts index d51e37c..997830c 100644 --- a/packages/message-broker/src/consumer.ts +++ b/packages/message-broker/src/consumer.ts @@ -1,13 +1,15 @@ -import { ConsumerCallback } from './index'; +import { ConsumerCallback, ListenOnOptions } from './index'; // This class is used to store the callback function and whether it is registered or not. export class Consumer { callback: ConsumerCallback; registered: boolean; + options: ListenOnOptions; - constructor(callback: ConsumerCallback) { + constructor(callback: ConsumerCallback, options: ListenOnOptions = {}) { this.callback = callback; this.registered = false; + this.options = options; } register() { diff --git a/packages/message-broker/src/index.spec.ts b/packages/message-broker/src/index.spec.ts index a386753..214e4cf 100644 --- a/packages/message-broker/src/index.spec.ts +++ b/packages/message-broker/src/index.spec.ts @@ -21,6 +21,7 @@ describe('RabbitMQMessageBroker', () => { bindQueue: jest.fn(), publish: jest.fn(), consume: jest.fn().mockResolvedValue({}), + prefetch: jest.fn(), on: jest.fn(), }; @@ -109,6 +110,25 @@ describe('RabbitMQMessageBroker', () => { await broker.listenOn(Queue.SCHEDULING_PROPOSAL, consumer); expect(mockAmqpChannel.consume).toHaveBeenCalledTimes(2); }); + + it('should call channel.prefetch with the given value before consume when prefetch option is set', async () => { + await broker.listenOn(Queue.SCHEDULING_PROPOSAL, consumer, { + prefetch: 10, + }); + + expect(mockAmqpChannel.prefetch).toHaveBeenCalledWith(10, false); + const prefetchOrder = (mockAmqpChannel.prefetch as jest.Mock).mock + .invocationCallOrder[0]; + const consumeOrder = (mockAmqpChannel.consume as jest.Mock).mock + .invocationCallOrder[0]; + expect(prefetchOrder).toBeLessThan(consumeOrder); + }); + + it('should not call channel.prefetch when prefetch option is not set', async () => { + await broker.listenOn(Queue.SCHEDULING_PROPOSAL, consumer); + + expect(mockAmqpChannel.prefetch).not.toHaveBeenCalled(); + }); }); describe('scheduleReconnect', () => { diff --git a/packages/message-broker/src/index.ts b/packages/message-broker/src/index.ts index 1cffe45..d34f84f 100644 --- a/packages/message-broker/src/index.ts +++ b/packages/message-broker/src/index.ts @@ -24,6 +24,10 @@ export type ConsumerCallback = ( properties: MessageProperties ) => Promise; +export type ListenOnOptions = { + prefetch?: number; +}; + export interface MessageBroker { sendMessage(queue: Queue, type: string, message: string): Promise; sendBroadcast(queue: Queue, type: string, message: string): Promise; @@ -36,7 +40,11 @@ export interface MessageBroker { queueName: string, exchangeName: string ): Promise; - listenOn(queue: Queue, cb: ConsumerCallback): Promise; + listenOn( + queue: Queue, + cb: ConsumerCallback, + options?: ListenOnOptions + ): Promise; listenOnBroadcast(cb: ConsumerCallback): void; } @@ -110,12 +118,16 @@ export class RabbitMQMessageBroker implements MessageBroker { } } - async listenOn(queue: Queue, cb: ConsumerCallback) { + async listenOn( + queue: Queue, + cb: ConsumerCallback, + options: ListenOnOptions = {} + ) { if (!this.queueConsumers.has(queue)) { this.queueConsumers.set(queue, []); } - this.queueConsumers.get(queue)?.push(new Consumer(cb)); + this.queueConsumers.get(queue)?.push(new Consumer(cb, options)); if (this.channel) { await this.registerConsumers(); @@ -385,6 +397,10 @@ export class RabbitMQMessageBroker implements MessageBroker { consumer.register(); } + if (consumer.options.prefetch !== undefined) { + await this.channel.prefetch(consumer.options.prefetch, false); + } + await this.channel .consume( queue,