Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ def lambda_handler(event, context: LambdaContext):
None,
)

def __init__(self, model: BatchSqsTypeModel | None = None, skip_group_on_error: bool = False):
def __init__(
self,
model: BatchSqsTypeModel | None = None,
skip_group_on_error: bool = False,
raise_on_entire_batch_failure: bool = True,
):
"""
Initialize the SqsFifoProcessor.

Expand All @@ -77,12 +82,15 @@ def __init__(self, model: BatchSqsTypeModel | None = None, skip_group_on_error:
skip_group_on_error: bool
Determines whether to exclusively skip messages from the MessageGroupID that encountered processing failures
Default is False.
raise_on_entire_batch_failure: bool
Raise an exception when the entire batch has failed processing.
When set to False, partial failures are reported in the response.

"""
self._skip_group_on_error: bool = skip_group_on_error
self._current_group_id = None
self._failed_group_ids: set[str] = set()
super().__init__(EventType.SQS, model)
super().__init__(EventType.SQS, model, raise_on_entire_batch_failure)

def _process_record(self, record):
self._current_group_id = record.get("attributes", {}).get("MessageGroupId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,24 @@ def lambda_handler(event, context):
assert result["batchItemFailures"][1]["itemIdentifier"] == third_record.message_id


def test_sqs_fifo_batch_processor_not_raise_when_entire_batch_fails(sqs_event_fifo_factory, record_handler):
first_record = SQSRecord(sqs_event_fifo_factory("fail"))
second_record = SQSRecord(sqs_event_fifo_factory("success"))
event = {"Records": [first_record.raw_event, second_record.raw_event]}

processor = SqsFifoPartialProcessor(raise_on_entire_batch_failure=False)

@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
return processor.response()

response = lambda_handler(event, {})

assert len(response["batchItemFailures"]) == 2
assert response["batchItemFailures"][0]["itemIdentifier"] == first_record.message_id
assert response["batchItemFailures"][1]["itemIdentifier"] == second_record.message_id


def test_sqs_fifo_batch_processor_middleware_with_skip_group_on_error(sqs_event_fifo_factory, record_handler):
# GIVEN a batch of 5 records with 3 different MessageGroupID
first_record = SQSRecord(sqs_event_fifo_factory("success", "1"))
Expand Down