Skip to content

feat: Kafka DLT Consumer 재처리 구현 (#97)#98

Merged
KTH1007 merged 2 commits into
developfrom
feat/#97-kafka-dlt-consumer
May 22, 2026
Merged

feat: Kafka DLT Consumer 재처리 구현 (#97)#98
KTH1007 merged 2 commits into
developfrom
feat/#97-kafka-dlt-consumer

Conversation

@KTH1007

@KTH1007 KTH1007 commented May 22, 2026

Copy link
Copy Markdown
Owner

관련 이슈

closes #97

작업 내용

  • DLT에 들어온 토픽 재처리
  • DLT 재처리 무한루프 방지

테스트

  • 로컬 테스트 완료

참고 사항

@KTH1007 KTH1007 self-assigned this May 22, 2026
@KTH1007 KTH1007 added the feat label May 22, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a retry and failure management system for Kafka-based notifications and post synchronization. It introduces Dead Letter Topic (DLT) consumers that attempt to re-process messages up to a defined limit before persisting them to a failure storage in the database. New entities and repositories were added to track these failed events. The review feedback highlights critical improvements for the DLT consumers, specifically the need for exception handling during JSON parsing to prevent partition blocking, the addition of @transactional for database consistency, and ensuring that asynchronous Kafka sends are properly completed before acknowledging messages to prevent data loss.

@KafkaListener(topics = KafkaConstants.NOTIFICATION_DLT_TOPIC, groupId = KafkaConstants.NOTIFICATION_DLT_GROUP)
public void consume(String payload, Acknowledgment ack,
@Header(name = KafkaHeaders.EXCEPTION_MESSAGE, required = false) String exceptionMessage) {
NotificationEvent event = objectMapper.readValue(payload, NotificationEvent.class);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

objectMapper.readValue 호출 시 페이로드가 유효하지 않은 JSON이거나 DTO 구조와 일치하지 않을 경우 예외가 발생합니다. DLT 컨슈머는 재처리 파이프라인의 마지막 단계이므로, 여기서 발생하는 예외로 인해 컨슈머가 무한 재시도 루프에 빠져 파티션이 차단(blocking)되지 않도록 try-catch 등을 통한 예외 처리가 필요합니다. 파싱 실패 시 로그를 남기고 해당 메시지를 무시(ack)하거나 별도의 실패 저장소에 기록하는 로직을 고려해 보세요.

public void consume(String payload, Acknowledgment ack,
@Header(name = KafkaHeaders.EXCEPTION_MESSAGE, required = false) String exceptionMessage) {
log.error("DLQ 수신 - payload: {}, 원인: {}", payload, exceptionMessage);
PostSyncEvent event = objectMapper.readValue(payload, PostSyncEvent.class);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

유효하지 않은 페이로드 수신 시 objectMapper.readValue에서 발생하는 예외가 컨슈머의 무한 재시도를 유발할 수 있습니다. DLT 컨슈머의 안정성을 위해 파싱 에러에 대한 예외 처리 로직을 추가하여 포이즌 메시지(Poison Message)로 인한 장애를 방지해야 합니다.

private final FailedNotificationRepository failedNotificationRepository;

@KafkaListener(topics = KafkaConstants.NOTIFICATION_DLT_TOPIC, groupId = KafkaConstants.NOTIFICATION_DLT_GROUP)
public void consume(String payload, Acknowledgment ack,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

JPA Repository를 사용하여 데이터를 저장하는 로직이 포함되어 있으므로, 데이터 일관성과 영속성 컨텍스트의 안정적인 관리를 위해 @Transactional 어노테이션을 추가하는 것을 권장합니다.

event.receiverId(), event.type(), event.retryCount(), exceptionMessage);

if (event.retryCount() < KafkaConstants.MAX_DLT_RETRY) {
kafkaTemplate.send(KafkaConstants.NOTIFICATION_TOPIC, objectMapper.writeValueAsString(event.withRetry()));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

kafkaTemplate.send()는 비동기로 동작합니다. 메시지 재투입 후 즉시 ack.acknowledge()를 호출하면, 실제 전송이 실패하더라도 DLT 메시지는 처리된 것으로 간주되어 메시지 유실이 발생할 수 있습니다. 재투입의 확실한 보장을 위해 get()을 사용하여 전송 결과를 동기적으로 확인하거나, 콜백을 사용하여 성공 시에만 ack를 호출하는 것이 안전합니다.

private final FailedPostSyncRepository failedPostSyncRepository;

@KafkaListener(topics = KafkaConstants.POST_SYNC_DLT_TOPIC, groupId = KafkaConstants.POST_SYNC_DLT_GROUP)
public void consume(String payload, Acknowledgment ack,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

failedPostSyncRepository.save()를 통한 DB 저장 로직의 트랜잭션 보장과 영속성 컨텍스트 관리를 위해 @Transactional 어노테이션 추가가 필요합니다.

event.postId(), event.operationType(), event.retryCount(), exceptionMessage);

if (event.retryCount() < KafkaConstants.MAX_DLT_RETRY) {
kafkaTemplate.send(KafkaConstants.POST_SYNC_TOPIC, objectMapper.writeValueAsString(event.withRetry()));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

비동기 전송 방식인 kafkaTemplate.send() 사용 시, 전송 완료 전에 ack가 수행되면 전송 실패 상황에서 메시지가 유실될 위험이 있습니다. 동기 방식(get())으로 결과를 확인하거나 성공 콜백을 통해 ack를 제어하는 방식을 권장합니다.

@KTH1007 KTH1007 merged commit 6539fe9 into develop May 22, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: Kafka DLT Consumer 재처리 구현

1 participant