Add nonRetryableExceptions to your @CustomKafkaListener annotation:
package com.example.service;
import net.damero.Kafka.Annotations.DameroKafkaListener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.stereotype.Service;
@Service
public class OrderProcessingService {
@DameroKafkaListener(
topic = "orders",
dlqTopic = "orders-dlq",
maxAttempts = 3,
delay = 1000,
delayMethod = DelayMethod.EXPO,
nonRetryableExceptions = {
IllegalArgumentException.class,
ValidationException.class,
UnsupportedOperationException.class
}
)
@KafkaListener(topics = "orders", groupId = "order-processor")
public void processOrder(ConsumerRecord<String, OrderEvent> record, Acknowledgment ack) {
OrderEvent order = record.value();
// This will be retried if it fails
if (order.getAmount() < 0) {
throw new IllegalArgumentException("Order amount cannot be negative");
}
// This will be retried if it fails
if (!isValidPayment(order)) {
throw new PaymentException("Payment validation failed");
}
// Process the order
orderService.process(order);
ack.acknowledge();
}
private boolean isValidPayment(OrderEvent order) {
// Simulate payment validation
return order.getPaymentMethod() != null;
}
}Send a message that triggers IllegalArgumentException:
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
@PostMapping("/test-non-retryable")
public String testNonRetryable() {
OrderEvent badOrder = new OrderEvent();
badOrder.setAmount(-100); // This will trigger IllegalArgumentException
kafkaTemplate.send("orders", badOrder);
return "Sent bad order - should go directly to DLQ without retries";
}
}Expected Behavior:
- Message is processed once
IllegalArgumentExceptionis thrown- Message goes directly to
orders-dlqtopic - No retries occur
- Check logs:
"exception IllegalArgumentException is non-retryable for topic: orders - sending directly to dlq"
Send a message that triggers a retryable exception:
@PostMapping("/test-retryable")
public String testRetryable() {
OrderEvent order = new OrderEvent();
order.setAmount(100);
order.setPaymentMethod(null); // This might trigger PaymentException
kafkaTemplate.send("orders", order);
return "Sent order - should retry if PaymentException occurs";
}Expected Behavior:
- Message is processed
- If
PaymentExceptionoccurs, it will be retried up to 3 times - Only after max attempts, it goes to DLQ
- Check logs for retry attempts
Look for these log messages:
# Non-retryable exception (sent directly to DLQ)
INFO: exception IllegalArgumentException is non-retryable for topic: orders - sending directly to dlq
# Retryable exception (gets retried)
DEBUG: exception caught during processing for topic: orders: PaymentException
DEBUG: scheduled retry attempt 1 for event in topic: orders
DEBUG: scheduled retry attempt 2 for event in topic: orders
INFO: max attempts reached (3) for event in topic: orders - sending to dlq: orders-dlqMonitor the DLQ topic to see which messages ended up there:
@Component
public class DLQMonitor {
@KafkaListener(topics = "orders-dlq", groupId = "dlq-monitor")
public void monitorDLQ(ConsumerRecord<String, EventWrapper<?>> record) {
EventWrapper<?> wrapper = record.value();
Exception exception = wrapper.getMetadata().getLastFailureException();
int attempts = wrapper.getMetadata().getAttempts();
System.out.println("DLQ Message:");
System.out.println(" Exception: " + exception.getClass().getSimpleName());
System.out.println(" Attempts: " + attempts);
System.out.println(" Original Event: " + wrapper.getEvent());
// If attempts == 1, it was likely a non-retryable exception
// If attempts == 3, it exhausted retries
}
}# Check DLQ topic for messages
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic orders-dlq \
--from-beginning
# Check main topic for messages (should be empty after processing)
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic orders \
--from-beginning@SpringBootTest
@EmbeddedKafka(topics = {"orders", "orders-dlq"})
class OrderProcessingIntegrationTest {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
@Autowired
private DLQMessageCollector dlqCollector;
@Test
void testNonRetryableException_GoesDirectlyToDLQ() {
// Given: An order that triggers IllegalArgumentException
OrderEvent badOrder = new OrderEvent();
badOrder.setAmount(-100);
// When: Send to Kafka
kafkaTemplate.send("orders", badOrder);
// Then: Should go directly to DLQ with 1 attempt
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
EventWrapper<?> dlqMessage = dlqCollector.getLastMessage();
assertNotNull(dlqMessage);
assertEquals(1, dlqMessage.getMetadata().getAttempts());
assertTrue(dlqMessage.getMetadata().getLastFailureException()
instanceof IllegalArgumentException);
});
}
@Test
void testRetryableException_GetsRetried() {
// Given: An order that triggers PaymentException (retryable)
OrderEvent order = new OrderEvent();
order.setAmount(100);
order.setPaymentMethod(null);
// When: Send to Kafka
kafkaTemplate.send("orders", order);
// Then: Should retry 3 times before going to DLQ
await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
EventWrapper<?> dlqMessage = dlqCollector.getLastMessage();
assertNotNull(dlqMessage);
assertEquals(3, dlqMessage.getMetadata().getAttempts());
});
}
}-
Start with a Test Environment
- Set up a separate Kafka cluster for testing
- Use test topics and test consumers
-
Create Test Messages
- Use a REST endpoint or admin panel to send test messages
- Create messages that trigger different exception types
-
Monitor Behavior
- Watch application logs in real-time
- Check DLQ topic contents
- Verify retry counts in EventMetadata
-
Verify Metrics (if using Micrometer)
// Check retry metrics Counter retryCounter = meterRegistry.counter("kafka.retry.count", "topic", "orders"); Counter dlqCounter = meterRegistry.counter("kafka.dlq.count", "topic", "orders");
nonRetryableExceptions = {
// Validation errors - shouldn't retry
IllegalArgumentException.class,
ValidationException.class,
NullPointerException.class,
// Business logic errors - shouldn't retry
UnsupportedOperationException.class,
IllegalStateException.class,
// But network/timeout errors - should retry
// (Don't include these in nonRetryableExceptions)
// SocketTimeoutException.class,
// ConnectException.class,
}-
Enable Debug Logging
logging.level.net.damero.Kafka.Aspect=DEBUG -
Check EventWrapper Metadata
- Inspect
EventMetadatain DLQ messages attemptsfield tells you if it was retriedfirstFailureExceptionvslastFailureExceptionshows exception history
- Inspect
-
Use Kafka Tooling
- Kafka Tool, Kafka Manager, or Conduktor to inspect topics
- Check message headers and payloads
-
Add Custom Logging
logger.info("Processing order: {}, Exception type: {}, Is retryable: {}", order.getId(), e.getClass().getSimpleName(), !isNonRetryableException(e, dameroKafkaListener));