Skip to content

Commit 321b18f

Browse files
authored
Add RecordMetadata to ProducerListener.onError
When sending a record with no partition, and an error occurs, the `RecordMetadata` might contain the assigned partition if the error occurs after partition assignment; this may be useful information for the application. * Add @nullable
1 parent be0cc7c commit 321b18f

File tree

6 files changed

+46
-8
lines changed

6 files changed

+46
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition,
385385

386386
@Override
387387
public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
388+
Assert.notNull(record, "'record' cannot be null");
388389
return doSend(record);
389390
}
390391

@@ -601,7 +602,7 @@ private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final
601602
}
602603
future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
603604
if (KafkaTemplate.this.producerListener != null) {
604-
KafkaTemplate.this.producerListener.onError(producerRecord, exception);
605+
KafkaTemplate.this.producerListener.onError(producerRecord, metadata, exception);
605606
}
606607
KafkaTemplate.this.logger.debug(exception, () -> "Failed to send: " + producerRecord);
607608
}

spring-kafka/src/main/java/org/springframework/kafka/support/CompositeProducerListener.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -74,8 +74,14 @@ public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata record
7474
}
7575

7676
@Override
77+
@Deprecated
7778
public void onError(ProducerRecord<K, V> producerRecord, Exception exception) {
7879
this.delegates.forEach(d -> d.onError(producerRecord, exception));
7980
}
8081

82+
@Override
83+
public void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
84+
this.delegates.forEach(d -> d.onError(producerRecord, recordMetadata, exception));
85+
}
86+
8187
}

spring-kafka/src/main/java/org/springframework/kafka/support/LoggingProducerListener.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
import org.apache.commons.logging.LogFactory;
2020
import org.apache.kafka.clients.producer.ProducerRecord;
21+
import org.apache.kafka.clients.producer.RecordMetadata;
2122

2223
import org.springframework.core.log.LogAccessor;
24+
import org.springframework.lang.Nullable;
2325
import org.springframework.util.ObjectUtils;
2426

2527
/**
@@ -65,7 +67,7 @@ public void setMaxContentLogged(int maxContentLogged) {
6567
}
6668

6769
@Override
68-
public void onError(ProducerRecord<K, V> record, Exception exception) {
70+
public void onError(ProducerRecord<K, V> record, @Nullable RecordMetadata recordMetadata, Exception exception) {
6971
LOGGER.error(exception, () -> {
7072
StringBuffer logOutput = new StringBuffer();
7173
logOutput.append("Exception thrown when sending a message");
@@ -79,7 +81,9 @@ public void onError(ProducerRecord<K, V> record, Exception exception) {
7981
}
8082
logOutput.append(" to topic ").append(record.topic());
8183
if (record.partition() != null) {
82-
logOutput.append(" and partition ").append(record.partition());
84+
logOutput.append(" and partition ").append(recordMetadata != null
85+
? recordMetadata.partition()
86+
: record.partition());
8387
}
8488
logOutput.append(":");
8589
return logOutput.toString();

spring-kafka/src/main/java/org/springframework/kafka/support/ProducerListener.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,8 @@
1919
import org.apache.kafka.clients.producer.ProducerRecord;
2020
import org.apache.kafka.clients.producer.RecordMetadata;
2121

22+
import org.springframework.lang.Nullable;
23+
2224
/**
2325
* Listener for handling outbound Kafka messages. Exactly one of its methods will be invoked, depending on whether
2426
* the write has been acknowledged or not.
@@ -49,8 +51,28 @@ default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recor
4951
* Invoked after an attempt to send a message has failed.
5052
* @param producerRecord the failed record
5153
* @param exception the exception thrown
54+
* @deprecated in favor of {@link #onError(ProducerRecord, RecordMetadata, Exception)}.
5255
*/
56+
@Deprecated
5357
default void onError(ProducerRecord<K, V> producerRecord, Exception exception) {
5458
}
5559

60+
/**
61+
* Invoked after an attempt to send a message has failed.
62+
* @param producerRecord the failed record
63+
* @param recordMetadata The metadata for the record that was sent (i.e. the partition
64+
* and offset). If an error occurred, metadata will contain only valid topic and maybe
65+
* the partition. If the partition is not provided in the ProducerRecord and an error
66+
* occurs before partition is assigned, then the partition will be set to
67+
* RecordMetadata.UNKNOWN_PARTITION.
68+
* @param exception the exception thrown
69+
* @since 2.6.2
70+
*/
71+
@SuppressWarnings("deprecation")
72+
default void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata,
73+
Exception exception) {
74+
75+
onError(producerRecord, exception);
76+
}
77+
5678
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.kafka.common.Metric;
5454
import org.apache.kafka.common.MetricName;
5555
import org.apache.kafka.common.PartitionInfo;
56+
import org.apache.kafka.common.TopicPartition;
5657
import org.apache.kafka.common.errors.TimeoutException;
5758
import org.apache.kafka.common.header.Header;
5859
import org.apache.kafka.common.serialization.Serializer;
@@ -276,7 +277,9 @@ public void onSuccess(ProducerRecord<Integer, String> record, RecordMetadata rec
276277
}
277278

278279
@Override
279-
public void onError(ProducerRecord<Integer, String> producerRecord, Exception exception) {
280+
public void onError(ProducerRecord<Integer, String> producerRecord, RecordMetadata metadata,
281+
Exception exception) {
282+
280283
assertThat(producerRecord).isNotNull();
281284
assertThat(exception).isNotNull();
282285
onErrorDelegateCalls.incrementAndGet();
@@ -298,7 +301,8 @@ public void onError(ProducerRecord<Integer, String> producerRecord, Exception ex
298301
//Drain the topic
299302
KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
300303
pf.destroy();
301-
cpl.onError(records.get(0), new RuntimeException("x"));
304+
cpl.onError(records.get(0), new RecordMetadata(new TopicPartition(INT_KEY_TOPIC, -1), 0L, 0L, 0L, 0L, 0, 0),
305+
new RuntimeException("x"));
302306
assertThat(onErrorDelegateCalls.get()).isEqualTo(2);
303307
}
304308

src/reference/asciidoc/kafka.adoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,8 @@ public interface ProducerListener<K, V> {
309309
310310
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
311311
312-
void onError(ProducerRecord<K, V> producerRecord, Exception exception);
312+
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
313+
Exception exception);
313314
314315
}
315316
----

0 commit comments

Comments
 (0)