Skip to content

Commit 0287046

Browse files
garyrussellartembilan
authored andcommitted
ReplyingKTemplate Handle DeserializationException
Handle `DeserializationException` via `ErrorHandlingDeserializer` in `ReplyingKafkaTemplate`. **I will do the backports after review/merge; I expect conflicts**
1 parent 5f4dfcc commit 0287046

File tree

3 files changed

+117
-8
lines changed

3 files changed

+117
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,18 @@
3737
import org.springframework.beans.factory.DisposableBean;
3838
import org.springframework.beans.factory.InitializingBean;
3939
import org.springframework.context.SmartLifecycle;
40+
import org.springframework.core.log.LogAccessor;
4041
import org.springframework.kafka.KafkaException;
4142
import org.springframework.kafka.core.KafkaTemplate;
4243
import org.springframework.kafka.core.ProducerFactory;
4344
import org.springframework.kafka.listener.BatchMessageListener;
4445
import org.springframework.kafka.listener.ContainerProperties;
4546
import org.springframework.kafka.listener.GenericMessageListenerContainer;
47+
import org.springframework.kafka.listener.ListenerUtils;
4648
import org.springframework.kafka.support.KafkaHeaders;
4749
import org.springframework.kafka.support.TopicPartitionOffset;
50+
import org.springframework.kafka.support.serializer.DeserializationException;
51+
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
4852
import org.springframework.lang.Nullable;
4953
import org.springframework.scheduling.TaskScheduler;
5054
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@@ -92,8 +96,7 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen
9296

9397
private boolean sharedReplyTopic;
9498

95-
private Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy =
96-
ReplyingKafkaTemplate::defaultCorrelationIdStrategy;
99+
private Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy = ReplyingKafkaTemplate::defaultCorrelationIdStrategy;
97100

98101
private String correlationHeaderName = KafkaHeaders.CORRELATION_ID;
99102

@@ -207,8 +210,8 @@ public Collection<TopicPartition> getAssignedReplyTopicPartitions() {
207210
}
208211

209212
/**
210-
* Set to true when multiple templates are using the same topic for replies.
211-
* This simply changes logs for unexpected replies to debug instead of error.
213+
* Set to true when multiple templates are using the same topic for replies. This
214+
* simply changes logs for unexpected replies to debug instead of error.
212215
* @param sharedReplyTopic true if using a shared topic.
213216
* @since 2.2
214217
*/
@@ -404,13 +407,54 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
404407
logLateArrival(record, correlationId);
405408
}
406409
else {
407-
this.logger.debug(() -> "Received: " + record + WITH_CORRELATION_ID + correlationKey);
408-
future.set(record);
410+
boolean ok = true;
411+
if (record.value() == null) {
412+
DeserializationException de = checkDeserialization(record, this.logger);
413+
if (de != null) {
414+
ok = false;
415+
future.setException(de);
416+
}
417+
}
418+
if (ok) {
419+
this.logger.debug(() -> "Received: " + record + WITH_CORRELATION_ID + correlationKey);
420+
future.set(record);
421+
}
409422
}
410423
}
411424
});
412425
}
413426

427+
/**
428+
* Return a {@link DeserializationException} if either the key or value failed
429+
* deserialization; null otherwise. If you need to determine whether it was the key or
430+
* value, call
431+
* {@link ListenerUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)}
432+
* with {@link ErrorHandlingDeserializer#KEY_DESERIALIZER_EXCEPTION_HEADER} and
433+
* {@link ErrorHandlingDeserializer#VALUE_DESERIALIZER_EXCEPTION_HEADER} instead.
434+
* @param record the record.
435+
* @param logger a {@link LogAccessor}.
436+
* @return the {@link DeserializationException} or {@code null}.
437+
* @since 2.2.15
438+
*/
439+
@Nullable
440+
public static DeserializationException checkDeserialization(ConsumerRecord<?, ?> record, LogAccessor logger) {
441+
DeserializationException exception = ListenerUtils.getExceptionFromHeader(record,
442+
ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger);
443+
if (exception != null) {
444+
logger.error(exception, () -> "Reply value deserialization failed for " + record.topic() + "-"
445+
+ record.partition() + "@" + record.offset());
446+
return exception;
447+
}
448+
exception = ListenerUtils.getExceptionFromHeader(record,
449+
ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER, logger);
450+
if (exception != null) {
451+
logger.error(exception, () -> "Reply key deserialization failed for " + record.topic() + "-"
452+
+ record.partition() + "@" + record.offset());
453+
return exception;
454+
}
455+
return null;
456+
}
457+
414458
protected void logLateArrival(ConsumerRecord<K, R> record, CorrelationKey correlationId) {
415459
if (this.sharedReplyTopic) {
416460
this.logger.debug(() -> missingCorrelationLogMessage(record, correlationId));

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.concurrent.atomic.AtomicReference;
4242

4343
import org.apache.kafka.clients.consumer.Consumer;
44+
import org.apache.kafka.clients.consumer.ConsumerConfig;
4445
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
4546
import org.apache.kafka.clients.consumer.ConsumerRecord;
4647
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -51,6 +52,7 @@
5152
import org.apache.kafka.common.header.Headers;
5253
import org.apache.kafka.common.header.internals.RecordHeader;
5354
import org.apache.kafka.common.header.internals.RecordHeaders;
55+
import org.apache.kafka.common.serialization.Deserializer;
5456
import org.junit.jupiter.api.BeforeEach;
5557
import org.junit.jupiter.api.Disabled;
5658
import org.junit.jupiter.api.Test;
@@ -78,6 +80,8 @@
7880
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
7981
import org.springframework.kafka.support.TopicPartitionOffset;
8082
import org.springframework.kafka.support.converter.MessagingMessageConverter;
83+
import org.springframework.kafka.support.serializer.DeserializationException;
84+
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
8185
import org.springframework.kafka.test.EmbeddedKafkaBroker;
8286
import org.springframework.kafka.test.context.EmbeddedKafka;
8387
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -104,7 +108,8 @@
104108
ReplyingKafkaTemplateTests.F_REPLY, ReplyingKafkaTemplateTests.F_REQUEST,
105109
ReplyingKafkaTemplateTests.G_REPLY, ReplyingKafkaTemplateTests.G_REQUEST,
106110
ReplyingKafkaTemplateTests.H_REPLY, ReplyingKafkaTemplateTests.H_REQUEST,
107-
ReplyingKafkaTemplateTests.I_REPLY, ReplyingKafkaTemplateTests.I_REQUEST })
111+
ReplyingKafkaTemplateTests.I_REPLY, ReplyingKafkaTemplateTests.I_REQUEST,
112+
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST })
108113
public class ReplyingKafkaTemplateTests {
109114

110115
public static final String A_REPLY = "aReply";
@@ -143,6 +148,10 @@ public class ReplyingKafkaTemplateTests {
143148

144149
public static final String I_REQUEST = "iRequest";
145150

151+
public static final String J_REPLY = "jReply";
152+
153+
public static final String J_REQUEST = "jRequest";
154+
146155
@Autowired
147156
private EmbeddedKafkaBroker embeddedKafka;
148157

@@ -189,6 +198,25 @@ public void testGood() throws Exception {
189198
}
190199
}
191200

201+
@Test
202+
public void testBadDeserialize() throws Exception {
203+
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(J_REPLY, true);
204+
try {
205+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
206+
Headers headers = new RecordHeaders();
207+
headers.add("baz", "buz".getBytes());
208+
ProducerRecord<Integer, String> record = new ProducerRecord<>(J_REQUEST, null, null, null, "foo", headers);
209+
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
210+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
211+
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> future.get(10, TimeUnit.SECONDS))
212+
.withCauseExactlyInstanceOf(DeserializationException.class);
213+
}
214+
finally {
215+
template.stop();
216+
template.destroy();
217+
}
218+
}
219+
192220
@Test
193221
public void testMultiListenerMessageReturn() throws Exception {
194222
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(C_REPLY);
@@ -477,6 +505,12 @@ public void testAggregateOrphansNotStored() throws Exception {
477505
}
478506

479507
public ReplyingKafkaTemplate<Integer, String, String> createTemplate(String topic) throws Exception {
508+
return createTemplate(topic, false);
509+
}
510+
511+
public ReplyingKafkaTemplate<Integer, String, String> createTemplate(String topic, boolean badDeser)
512+
throws Exception {
513+
480514
ContainerProperties containerProperties = new ContainerProperties(topic);
481515
final CountDownLatch latch = new CountDownLatch(1);
482516
containerProperties.setConsumerRebalanceListener(new ConsumerRebalanceListener() {
@@ -493,6 +527,10 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
493527

494528
});
495529
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", embeddedKafka);
530+
if (badDeser) {
531+
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
532+
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, BadDeser.class);
533+
}
496534
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
497535
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
498536
containerProperties);
@@ -508,7 +546,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
508546
}
509547

510548
public ReplyingKafkaTemplate<Integer, String, String> createTemplate(TopicPartitionOffset topic) {
511-
512549
ContainerProperties containerProperties = new ContainerProperties(topic);
513550
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", embeddedKafka);
514551
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
@@ -687,6 +724,12 @@ public Message<?> messageReturn(String in) {
687724
.build();
688725
}
689726

727+
@KafkaListener(id = J_REQUEST, topics = J_REQUEST)
728+
@SendTo // default REPLY_TOPIC header
729+
public String handleJ(String in) throws InterruptedException {
730+
return in.toUpperCase();
731+
}
732+
690733
}
691734

692735
@KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)
@@ -717,4 +760,18 @@ public String listen1(String in) {
717760

718761
}
719762

763+
public static class BadDeser implements Deserializer<Object> {
764+
765+
@Override
766+
public Object deserialize(String topic, byte[] data) {
767+
return null;
768+
}
769+
770+
@Override
771+
public Object deserialize(String topic, Headers headers, byte[] data) {
772+
throw new IllegalStateException("test reply deserialization failure");
773+
}
774+
775+
}
776+
720777
}

src/reference/asciidoc/kafka.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,9 @@ public class KRequestingApplication {
589589

590590
Note that we can use Boot's auto-configured container factory to create the reply container.
591591

592+
If a non-trivial deserializer is being used for replies, consider using an <<error-handling-deserializer,`ErrorHandlingDeserializer`>> that delegates to your configured deserializer.
593+
When so configured, the `RequestReplyFuture` will be completed exceptionally and you can catch the `ExecutionException`, with the `DeserializationException` in its `cause` property.
594+
592595
The template sets a header (named `KafkaHeaders.CORRELATION_ID` by default), which must be echoed back by the server side.
593596

594597
In this case, the following `@KafkaListener` application responds:
@@ -771,6 +774,11 @@ IMPORTANT: The listener container for the replies MUST be configured with `AckMo
771774
To avoid any possibility of losing messages, the template only commits offsets when there are zero requests outstanding, i.e. when the last outstanding request is released by the release strategy.
772775
After a rebalance, it is possible for duplicate reply deliveries; these will be ignored for any in-flight requests; you may see error log messages when duplicate replies are received for already released replies.
773776

777+
NOTE: If you use an <<error-handling-deserializer,`ErrorHandlingDeserializer`>> with this aggregating template, the framework will not automatically detect `DeserializationException` s.
778+
Instead, the record (with a `null` value) will be returned intact, with the deserialization exception(s) in headers.
779+
It is recommended that applications call the utility method `ReplyingKafkaTemplate.checkDeserialization()` method to determine if a deserialization exception occurred.
780+
See its javadocs for more information.
781+
774782
[[receiving-messages]]
775783
==== Receiving Messages
776784

0 commit comments

Comments
 (0)