@@ -13,7 +13,6 @@ import {
1313} from '@aws-sdk/client-sqs' ;
1414import { NodeHttpHandler } from '@smithy/node-http-handler' ;
1515import alai from 'alai' ;
16- import { each } from 'async' ;
1716import { v4 as uuid } from 'uuid' ;
1817
1918import DependencyAwareClass from '../core/DependencyAwareClass' ;
@@ -316,111 +315,92 @@ export default class SQSService<
316315 * @param queue
317316 * @param messageModels
318317 */
319- batchDelete ( queue : QueueName < TConfig > , messageModels : SQSMessageModel [ ] ) : Promise < void > {
318+ async batchDelete ( queue : QueueName < TConfig > , messageModels : SQSMessageModel [ ] ) : Promise < void > {
320319 const queueUrl = this . queueUrls [ queue ] ;
321320 const logger = this . di . get ( LoggerService ) ;
322321 const timer = this . di . get ( TimerService ) ;
322+
323323 const timerId = `sqs-batch-delete-${ uuid ( ) } - Queue: '${ queueUrl } '` ;
324+ timer . start ( timerId ) ;
324325
325- return new Promise < void > ( ( resolve ) => {
326- const messagesForDeletion : { Id : string ; ReceiptHandle : string } [ ] = [ ] ;
327-
328- timer . start ( timerId ) ;
329- // assuming openFiles is an array of file names
330- each (
331- messageModels ,
332- ( messageModel , callback ) => {
333- if ( messageModel instanceof SQSMessageModel && messageModel . isForDeletion ( ) === true ) {
334- messagesForDeletion . push ( {
335- Id : messageModel . getMessageId ( ) ,
336- ReceiptHandle : messageModel . getReceiptHandle ( ) ,
337- } ) ;
338- }
339- callback ( ) ;
340- } ,
341- ( loopError ) => {
342- if ( loopError ) {
343- logger . error ( loopError ) ;
344- resolve ( ) ;
345- }
346-
347- this . sqs . send ( new DeleteMessageBatchCommand ( {
348- Entries : messagesForDeletion ,
349- QueueUrl : queueUrl ,
350- } ) ) . finally ( ( ) => {
351- timer . stop ( timerId ) ;
352- } ) . catch ( ( error ) => {
353- logger . error ( error ) ;
354- } ) . then ( ( ) => {
355- resolve ( ) ;
356- } ) ;
357- } ,
358- ) ;
359- } ) ;
326+ const messagesForDeletion = messageModels
327+ . filter ( ( messageModel ) => (
328+ messageModel instanceof SQSMessageModel
329+ && messageModel . isForDeletion ( )
330+ ) )
331+ . map ( ( messageModel ) => ( {
332+ Id : messageModel . getMessageId ( ) ,
333+ ReceiptHandle : messageModel . getReceiptHandle ( ) ,
334+ } ) ) ;
335+
336+ try {
337+ await this . sqs . send ( new DeleteMessageBatchCommand ( {
338+ Entries : messagesForDeletion ,
339+ QueueUrl : queueUrl ,
340+ } ) ) ;
341+ } catch ( error ) {
342+ logger . error ( error ) ;
343+ } finally {
344+ timer . stop ( timerId ) ;
345+ }
360346 }
361347
362348 /**
363349 * Check SQS status.
364350 */
365- checkStatus ( ) : Promise < ServiceStatus > {
351+ async checkStatus ( ) : Promise < ServiceStatus > {
366352 const logger = this . di . get ( LoggerService ) ;
367353 const timer = this . di . get ( TimerService ) ;
354+
368355 const timerId = `sqs-list-queues-${ uuid ( ) } ` ;
356+ timer . start ( timerId ) ;
369357
370- return new Promise ( ( resolve ) => {
371- timer . start ( timerId ) ;
372-
373- let status : Status = 'OK' ;
374-
375- this . sqs . send ( new ListQueuesCommand ( ) )
376- . finally ( ( ) => {
377- timer . stop ( timerId ) ;
378- } )
379- . then ( ( data ) => {
380- if ( typeof data . QueueUrls === 'undefined' || data . QueueUrls . length === 0 ) {
381- status = 'APPLICATION_FAILURE' ;
382- }
383- } )
384- . catch ( ( error ) => {
385- logger . error ( error ) ;
386- status = 'APPLICATION_FAILURE' ;
387- } )
388- . then ( ( ) => {
389- resolve ( {
390- service : 'SQS' ,
391- status,
392- } ) ;
393- } ) ;
394- } ) ;
358+ let status : Status = 'OK' ;
359+ try {
360+ const result = await this . sqs . send ( new ListQueuesCommand ( ) ) ;
361+ if ( typeof result . QueueUrls === 'undefined' || result . QueueUrls . length === 0 ) {
362+ status = 'APPLICATION_FAILURE' ;
363+ }
364+ } catch ( error ) {
365+ logger . error ( error ) ;
366+ status = 'APPLICATION_FAILURE' ;
367+ } finally {
368+ timer . stop ( timerId ) ;
369+ }
370+
371+ return {
372+ service : 'SQS' ,
373+ status,
374+ } ;
395375 }
396376
397377 /**
398378 * Get the approximate number of messages in a queue.
399379 *
400380 * @param queue
401381 */
402- getMessageCount ( queue : QueueName < TConfig > ) : Promise < number > {
382+ async getMessageCount ( queue : QueueName < TConfig > ) : Promise < number > {
403383 const queueUrl = this . queueUrls [ queue ] ;
404384 const logger = this . di . get ( LoggerService ) ;
405385 const timer = this . di . get ( TimerService ) ;
406- const timerId = `sqs-get-queue-attributes-${ uuid ( ) } - Queue: '${ queueUrl } '` ;
407386
408- return new Promise ( ( resolve ) => {
409- timer . start ( timerId ) ;
387+ const timerId = `sqs-get-queue-attributes- ${ uuid ( ) } - Queue: ' ${ queueUrl } '` ;
388+ timer . start ( timerId ) ;
410389
411- this . sqs . send ( new GetQueueAttributesCommand ( {
390+ try {
391+ const result = await this . sqs . send ( new GetQueueAttributesCommand ( {
412392 AttributeNames : [ 'ApproximateNumberOfMessages' ] ,
413393 QueueUrl : queueUrl ,
414- } ) ) . finally ( ( ) => {
415- timer . stop ( timerId ) ;
416- } ) . then ( ( data ) => {
417- const messageCount = data . Attributes ?. ApproximateNumberOfMessages || '0' ;
418- resolve ( Number . parseInt ( messageCount , 10 ) ) ;
419- } ) . catch ( ( error ) => {
420- logger . error ( error ) ;
421- resolve ( 0 ) ;
422- } ) ;
423- } ) ;
394+ } ) ) ;
395+
396+ const messageCount = result . Attributes ?. ApproximateNumberOfMessages || '0' ;
397+ return Number . parseInt ( messageCount , 10 ) ;
398+ } catch ( error ) {
399+ logger . error ( error ) ;
400+ return 0 ;
401+ } finally {
402+ timer . stop ( timerId ) ;
403+ }
424404 }
425405
426406 /**
@@ -530,31 +510,31 @@ export default class SQSService<
530510 * @param queue string
531511 * @param timeout number
532512 */
533- receive ( queue : QueueName < TConfig > , timeout = 15 ) : Promise < SQSMessageModel [ ] > {
513+ async receive ( queue : QueueName < TConfig > , timeout = 15 ) : Promise < SQSMessageModel [ ] > {
534514 const queueUrl = this . queueUrls [ queue ] ;
535515 const logger = this . di . get ( LoggerService ) ;
536516 const timer = this . di . get ( TimerService ) ;
537- const timerId = `sqs-receive-message-${ uuid ( ) } - Queue: '${ queueUrl } '` ;
538517
539- return new Promise ( ( resolve , reject ) => {
540- timer . start ( timerId ) ;
518+ const timerId = `sqs-receive-message- ${ uuid ( ) } - Queue: ' ${ queueUrl } '` ;
519+ timer . start ( timerId ) ;
541520
542- this . sqs . send ( new ReceiveMessageCommand ( {
521+ try {
522+ const result = await this . sqs . send ( new ReceiveMessageCommand ( {
543523 QueueUrl : queueUrl ,
544524 VisibilityTimeout : timeout ,
545525 MaxNumberOfMessages : 10 ,
546- } ) ) . finally ( ( ) => {
547- timer . stop ( timerId ) ;
548- } ) . then ( ( data ) => {
549- if ( typeof data . Messages === 'undefined' ) {
550- resolve ( [ ] ) ;
551- } else {
552- resolve ( data . Messages . map ( ( message ) => new SQSMessageModel ( message ) ) ) ;
553- }
554- } ) . catch ( ( error ) => {
555- logger . error ( error ) ;
556- reject ( error ) ;
557- } ) ;
558- } ) ;
526+ } ) ) ;
527+
528+ if ( typeof result . Messages === 'undefined' ) {
529+ return [ ] ;
530+ }
531+
532+ return result . Messages . map ( ( message ) => new SQSMessageModel ( message ) ) ;
533+ } catch ( error ) {
534+ logger . error ( error ) ;
535+ throw error ;
536+ } finally {
537+ timer . stop ( timerId ) ;
538+ }
559539 }
560540}
0 commit comments