feat: Kafka DLT Consumer 재처리 구현 (#97)#98
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements a retry and failure management system for Kafka-based notifications and post synchronization. It introduces Dead Letter Topic (DLT) consumers that attempt to re-process messages up to a defined limit before persisting them to a failure storage in the database. New entities and repositories were added to track these failed events. The review feedback highlights critical improvements for the DLT consumers, specifically the need for exception handling during JSON parsing to prevent partition blocking, the addition of @transactional for database consistency, and ensuring that asynchronous Kafka sends are properly completed before acknowledging messages to prevent data loss.
| @KafkaListener(topics = KafkaConstants.NOTIFICATION_DLT_TOPIC, groupId = KafkaConstants.NOTIFICATION_DLT_GROUP) | ||
| public void consume(String payload, Acknowledgment ack, | ||
| @Header(name = KafkaHeaders.EXCEPTION_MESSAGE, required = false) String exceptionMessage) { | ||
| NotificationEvent event = objectMapper.readValue(payload, NotificationEvent.class); |
There was a problem hiding this comment.
| public void consume(String payload, Acknowledgment ack, | ||
| @Header(name = KafkaHeaders.EXCEPTION_MESSAGE, required = false) String exceptionMessage) { | ||
| log.error("DLQ 수신 - payload: {}, 원인: {}", payload, exceptionMessage); | ||
| PostSyncEvent event = objectMapper.readValue(payload, PostSyncEvent.class); |
| private final FailedNotificationRepository failedNotificationRepository; | ||
|
|
||
| @KafkaListener(topics = KafkaConstants.NOTIFICATION_DLT_TOPIC, groupId = KafkaConstants.NOTIFICATION_DLT_GROUP) | ||
| public void consume(String payload, Acknowledgment ack, |
| event.receiverId(), event.type(), event.retryCount(), exceptionMessage); | ||
|
|
||
| if (event.retryCount() < KafkaConstants.MAX_DLT_RETRY) { | ||
| kafkaTemplate.send(KafkaConstants.NOTIFICATION_TOPIC, objectMapper.writeValueAsString(event.withRetry())); |
| private final FailedPostSyncRepository failedPostSyncRepository; | ||
|
|
||
| @KafkaListener(topics = KafkaConstants.POST_SYNC_DLT_TOPIC, groupId = KafkaConstants.POST_SYNC_DLT_GROUP) | ||
| public void consume(String payload, Acknowledgment ack, |
| event.postId(), event.operationType(), event.retryCount(), exceptionMessage); | ||
|
|
||
| if (event.retryCount() < KafkaConstants.MAX_DLT_RETRY) { | ||
| kafkaTemplate.send(KafkaConstants.POST_SYNC_TOPIC, objectMapper.writeValueAsString(event.withRetry())); |
관련 이슈
closes #97
작업 내용
테스트
참고 사항