Skip to content

Commit 1810409

Browse files
authored
GH-1475: Add KafkaSendCallback
Resolves #1475 Add `onFailure` with a narrowed `Throwable` to make it easier to access the failed producer record. * Make `KafkaFailureCallback` a `FunctionalInterface` and fix javadocs.
1 parent b1f33aa commit 1810409

File tree

6 files changed

+264
-52
lines changed

6 files changed

+264
-52
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import org.springframework.util.concurrent.FailureCallback;
20+
21+
/**
22+
* An enhanced {@link FailureCallback} for reporting
23+
* {@link KafkaProducerException}s.
24+
*
25+
* @param <K> the key type.
26+
* @param <V> the value type.
27+
*
28+
* @author Gary Russell
29+
* @since 2.5
30+
*
31+
*/
32+
@FunctionalInterface
33+
public interface KafkaFailureCallback<K, V> extends FailureCallback {
34+
35+
@Override
36+
default void onFailure(Throwable ex) {
37+
onFailure((KafkaProducerException) ex);
38+
}
39+
40+
/**
41+
* Called when the send fails.
42+
* @param ex the exception.
43+
*/
44+
void onFailure(KafkaProducerException ex);
45+
46+
}

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaProducerException.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,13 +31,37 @@ public class KafkaProducerException extends KafkaException {
3131

3232
private final ProducerRecord<?, ?> producerRecord;
3333

34+
/**
35+
* Construct an instance with the provided properties.
36+
* @param failedProducerRecord the producer record.
37+
* @param message the message.
38+
* @param cause the cause.
39+
*/
3440
public KafkaProducerException(ProducerRecord<?, ?> failedProducerRecord, String message, Throwable cause) {
3541
super(message, cause);
3642
this.producerRecord = failedProducerRecord;
3743
}
3844

45+
/**
46+
* Return the failed producer record.
47+
* @return the record.
48+
* @deprecated in favor of {@link #getFailedProducerRecord()}
49+
*/
50+
@Deprecated
3951
public ProducerRecord<?, ?> getProducerRecord() {
4052
return this.producerRecord;
4153
}
4254

55+
/**
56+
* Return the failed producer record.
57+
* @param <K> the key type.
58+
* @param <V> the value type.
59+
* @return the record.
60+
* @since 2.5
61+
*/
62+
@SuppressWarnings("unchecked")
63+
public <K, V> ProducerRecord<K, V> getFailedProducerRecord() {
64+
return (ProducerRecord<K, V>) producerRecord;
65+
}
66+
4367
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import org.springframework.kafka.support.SendResult;
20+
import org.springframework.util.concurrent.ListenableFutureCallback;
21+
22+
/**
23+
* An enhanced {@link ListenableFutureCallback} for reporting
24+
* {@link KafkaProducerException}s.
25+
*
26+
* @param <K> the key type.
27+
* @param <V> the value type.
28+
*
29+
* @author Gary Russell
30+
* @since 2.5
31+
*
32+
*/
33+
public interface KafkaSendCallback<K, V> extends ListenableFutureCallback<SendResult<K, V>>, KafkaFailureCallback<K, V> {
34+
35+
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
import static org.assertj.core.api.Assertions.allOf;
2020
import static org.assertj.core.api.Assertions.assertThat;
2121
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.BDDMockito.given;
24+
import static org.mockito.BDDMockito.willAnswer;
2225
import static org.mockito.Mockito.mock;
2326
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
2427
import static org.springframework.kafka.test.assertj.KafkaConditions.keyValue;
@@ -41,6 +44,7 @@
4144

4245
import org.apache.kafka.clients.consumer.Consumer;
4346
import org.apache.kafka.clients.consumer.ConsumerRecord;
47+
import org.apache.kafka.clients.producer.Callback;
4448
import org.apache.kafka.clients.producer.Producer;
4549
import org.apache.kafka.clients.producer.ProducerConfig;
4650
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -75,6 +79,7 @@
7579
import org.springframework.messaging.support.MessageBuilder;
7680
import org.springframework.util.concurrent.ListenableFuture;
7781
import org.springframework.util.concurrent.ListenableFutureCallback;
82+
import org.springframework.util.concurrent.SettableListenableFuture;
7883

7984
/**
8085
* @author Gary Russell
@@ -317,7 +322,7 @@ public void onSuccess(ProducerRecord<Integer, String> record, RecordMetadata rec
317322
@Test
318323
void testWithCallback() throws Exception {
319324
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
320-
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
325+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
321326
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
322327
template.setDefaultTopic(INT_KEY_TOPIC);
323328
ListenableFuture<SendResult<Integer, String>> future = template.sendDefault("foo");
@@ -339,7 +344,66 @@ public void onFailure(Throwable ex) {
339344
});
340345
assertThat(KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC)).has(value("foo"));
341346
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
342-
pf.createProducer().close();
347+
pf.destroy();
348+
}
349+
350+
@SuppressWarnings("unchecked")
351+
@Test
352+
void testWithCallbackFailure() throws Exception {
353+
Producer<Integer, String> producer = mock(Producer.class);
354+
willAnswer(inv -> {
355+
Callback callback = inv.getArgument(1);
356+
callback.onCompletion(null, new RuntimeException("test"));
357+
return new SettableListenableFuture<RecordMetadata>();
358+
}).given(producer).send(any(), any());
359+
ProducerFactory<Integer, String> pf = mock(ProducerFactory.class);
360+
given(pf.createProducer()).willReturn(producer);
361+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
362+
ListenableFuture<SendResult<Integer, String>> future = template.send("foo", 1, "bar");
363+
final CountDownLatch latch = new CountDownLatch(1);
364+
final AtomicReference<SendResult<Integer, String>> theResult = new AtomicReference<>();
365+
AtomicReference<String> value = new AtomicReference<>();
366+
future.addCallback(new KafkaSendCallback<Integer, String>() {
367+
368+
@Override
369+
public void onSuccess(SendResult<Integer, String> result) {
370+
}
371+
372+
@Override
373+
public void onFailure(KafkaProducerException ex) {
374+
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
375+
value.set(failed.value());
376+
latch.countDown();
377+
}
378+
379+
});
380+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
381+
assertThat(value.get()).isEqualTo("bar");
382+
}
383+
384+
@SuppressWarnings("unchecked")
385+
@Test
386+
void testWithCallbackFailureFunctional() throws Exception {
387+
Producer<Integer, String> producer = mock(Producer.class);
388+
willAnswer(inv -> {
389+
Callback callback = inv.getArgument(1);
390+
callback.onCompletion(null, new RuntimeException("test"));
391+
return new SettableListenableFuture<RecordMetadata>();
392+
}).given(producer).send(any(), any());
393+
ProducerFactory<Integer, String> pf = mock(ProducerFactory.class);
394+
given(pf.createProducer()).willReturn(producer);
395+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
396+
ListenableFuture<SendResult<Integer, String>> future = template.send("foo", 1, "bar");
397+
final CountDownLatch latch = new CountDownLatch(1);
398+
final AtomicReference<SendResult<Integer, String>> theResult = new AtomicReference<>();
399+
AtomicReference<String> value = new AtomicReference<>();
400+
future.addCallback(result -> { }, (KafkaFailureCallback<Integer, String>) ex -> {
401+
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
402+
value.set(failed.value());
403+
latch.countDown();
404+
});
405+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
406+
assertThat(value.get()).isEqualTo("bar");
343407
}
344408

345409
@Test

0 commit comments

Comments
 (0)