From a5457af6f8f136e79b943594e4eeece124a4e6b1 Mon Sep 17 00:00:00 2001 From: Rohan Patnaik Date: Fri, 22 May 2026 13:31:17 +0530 Subject: [PATCH] feat(batch): support FIFO entire batch failure override --- .../batch/sqs_fifo_partial_processor.py | 12 ++++++++++-- .../test_utilities_batch.py | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py index 2e680e2f04e..060d086b57b 100644 --- a/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py +++ b/aws_lambda_powertools/utilities/batch/sqs_fifo_partial_processor.py @@ -66,7 +66,12 @@ def lambda_handler(event, context: LambdaContext): None, ) - def __init__(self, model: BatchSqsTypeModel | None = None, skip_group_on_error: bool = False): + def __init__( + self, + model: BatchSqsTypeModel | None = None, + skip_group_on_error: bool = False, + raise_on_entire_batch_failure: bool = True, + ): """ Initialize the SqsFifoProcessor. @@ -77,12 +82,15 @@ def __init__(self, model: BatchSqsTypeModel | None = None, skip_group_on_error: skip_group_on_error: bool Determines whether to exclusively skip messages from the MessageGroupID that encountered processing failures Default is False. + raise_on_entire_batch_failure: bool + Raise an exception when the entire batch has failed processing. + When set to False, partial failures are reported in the response. """ self._skip_group_on_error: bool = skip_group_on_error self._current_group_id = None self._failed_group_ids: set[str] = set() - super().__init__(EventType.SQS, model) + super().__init__(EventType.SQS, model, raise_on_entire_batch_failure) def _process_record(self, record): self._current_group_id = record.get("attributes", {}).get("MessageGroupId") diff --git a/tests/functional/batch/required_dependencies/test_utilities_batch.py b/tests/functional/batch/required_dependencies/test_utilities_batch.py index 43c2aa16191..a9c7b3280b4 100644 --- a/tests/functional/batch/required_dependencies/test_utilities_batch.py +++ b/tests/functional/batch/required_dependencies/test_utilities_batch.py @@ -499,6 +499,24 @@ def lambda_handler(event, context): assert result["batchItemFailures"][1]["itemIdentifier"] == third_record.message_id +def test_sqs_fifo_batch_processor_not_raise_when_entire_batch_fails(sqs_event_fifo_factory, record_handler): + first_record = SQSRecord(sqs_event_fifo_factory("fail")) + second_record = SQSRecord(sqs_event_fifo_factory("success")) + event = {"Records": [first_record.raw_event, second_record.raw_event]} + + processor = SqsFifoPartialProcessor(raise_on_entire_batch_failure=False) + + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return processor.response() + + response = lambda_handler(event, {}) + + assert len(response["batchItemFailures"]) == 2 + assert response["batchItemFailures"][0]["itemIdentifier"] == first_record.message_id + assert response["batchItemFailures"][1]["itemIdentifier"] == second_record.message_id + + def test_sqs_fifo_batch_processor_middleware_with_skip_group_on_error(sqs_event_fifo_factory, record_handler): # GIVEN a batch of 5 records with 3 different MessageGroupID first_record = SQSRecord(sqs_event_fifo_factory("success", "1"))