Skip to content

Commit 257bff5

Browse files
tomazfernandesgaryrussell
authored andcommitted
- Added the ability to set a global timeout for the retries - Added isPartitionPaused to MessageListenerContainer - Added new features to documentation - Added the ability to choose between suffixing the topics with the delay or it's index - Added the ability of not using a DLT - Changed the Backoff Timestamp Header from String (formatted LocalDateTime) to long (millis since epoch UTC) - Unwraps ListenerExecutionFailedException so the cause can be classified - A few cleanups in the API - Increased test coverage - Style adjustments
1 parent 1a2f671 commit 257bff5

File tree

48 files changed

+1620
-451
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1620
-451
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3664,8 +3664,9 @@ thing2
36643664
==== Pausing and Resuming Partitions on Listener Containers
36653665

36663666
Since version 2.7 you can pause and resume the consumption of specific partitions assigned to that consumer by using the `pausePartition(TopicPartition topicPartition)` and `resumePartition(TopicPartition topicPartition)` methods in the listener containers.
3667-
The pausing and resuming takes place before and after the `poll()` similar to the `pause()` and `resume()` methods.
3668-
The `isPartitionConsumerPaused()` method returns true if pause for that partition has been requested.
3667+
The pausing and resuming takes place respectively before and after the `poll()` similar to the `pause()` and `resume()` methods.
3668+
The `isPartitionPauseRequested()` method returns true if pause for that partition has been requested.
3669+
The `isPartitionPaused()` method returns true if that partition has effectively been paused.
36693670

36703671
Also since version 2.7 `ConsumerPartitionPausedEvent` and `ConsumerPartitionResumedEvent` instances are published with the container as the `source` property and the `TopicPartition` instance.
36713672

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 183 additions & 74 deletions
Large diffs are not rendered by default.

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import java.lang.annotation.RetentionPolicy;
2323
import java.lang.annotation.Target;
2424

25-
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
25+
import org.springframework.kafka.retrytopic.DltStrategy;
26+
import org.springframework.kafka.retrytopic.FixedDelayStrategy;
2627
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
27-
import org.springframework.kafka.retrytopic.destinationtopic.DestinationTopicPropertiesFactory;
28+
import org.springframework.kafka.retrytopic.RetryTopicConstants;
29+
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
2830
import org.springframework.retry.annotation.Backoff;
2931
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
3032

@@ -38,7 +40,7 @@
3840
*
3941
* @see RetryTopicConfigurer
4042
*/
41-
@Target({ ElementType.METHOD }) //, ElementType.TYPE }) TODO: Enable class level annotation
43+
@Target({ ElementType.METHOD })
4244
@Retention(RetentionPolicy.RUNTIME)
4345
@Documented
4446
public @interface RetryableTopic {
@@ -58,6 +60,16 @@
5860
*/
5961
Backoff backoff() default @Backoff;
6062

63+
/**
64+
*
65+
* The amount of time in milliseconds after which message retrying should give up
66+
* and send the message to the DLT.
67+
*
68+
* @return the timeout value.
69+
*
70+
*/
71+
long timeout() default RetryTopicConstants.NOT_SET;
72+
6173
/**
6274
*
6375
* The bean name of the {@link org.springframework.kafka.core.KafkaTemplate} bean
@@ -130,19 +142,28 @@
130142
*
131143
* @return the retry topics' suffix.
132144
*/
133-
String retryTopicSuffix() default DestinationTopicPropertiesFactory.DestinationTopicSuffixes.DEFAULT_RETRY_SUFFIX;
145+
String retryTopicSuffix() default RetryTopicConstants.DEFAULT_RETRY_SUFFIX;
146+
147+
/**
148+
* The suffix that will be appended to the main topic in order to generate
149+
* the dlt topic.
150+
*
151+
* @return the dlt suffix.
152+
*/
153+
String dltTopicSuffix() default RetryTopicConstants.DEFAULT_DLT_SUFFIX;
154+
134155

135156
/**
136157
* The suffix that will be appended to the main topic in order to generate
137158
* the dlt topic.
138159
*
139160
* @return the dlt suffix.
140161
*/
141-
String dltTopicSuffix() default DestinationTopicPropertiesFactory.DestinationTopicSuffixes.DEFAULT_DLT_SUFFIX;
162+
TopicSuffixingStrategy topicSuffixingStrategy() default TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE;
142163

143-
RetryTopicConfiguration.DltProcessingFailureStrategy dltProcessingFailureStrategy()
144-
default RetryTopicConfiguration.DltProcessingFailureStrategy.ALWAYS_RETRY;
164+
DltStrategy dltStrategy()
165+
default DltStrategy.ALWAYS_RETRY_ON_ERROR;
145166

146-
RetryTopicConfiguration.FixedDelayTopicStrategy fixedDelayTopicStrategy()
147-
default RetryTopicConfiguration.FixedDelayTopicStrategy.MULTIPLE_TOPICS;
167+
FixedDelayStrategy fixedDelayTopicStrategy()
168+
default FixedDelayStrategy.MULTIPLE_TOPICS;
148169
}

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public abstract class AbstractMessageListenerContainer<K, V>
108108

109109
private ApplicationContext applicationContext;
110110

111-
private final Set<TopicPartition> pausedPartitions;
111+
private final Set<TopicPartition> pauseRequestedPartitions;
112112

113113
/**
114114
* Construct an instance with the provided factory and properties.
@@ -151,7 +151,7 @@ else if (containerProperties.getTopicPartitions() != null) {
151151
this.containerProperties.setConsumerRebalanceListener(createSimpleLoggingConsumerRebalanceListener());
152152
}
153153

154-
this.pausedPartitions = new HashSet<>();
154+
this.pauseRequestedPartitions = new HashSet<>();
155155
}
156156

157157
@Override
@@ -242,22 +242,22 @@ protected boolean isPaused() {
242242

243243
@Override
244244
public boolean isPartitionPauseRequested(TopicPartition topicPartition) {
245-
synchronized (this.pausedPartitions) {
246-
return this.pausedPartitions.contains(topicPartition);
245+
synchronized (this.pauseRequestedPartitions) {
246+
return this.pauseRequestedPartitions.contains(topicPartition);
247247
}
248248
}
249249

250250
@Override
251251
public void pausePartition(TopicPartition topicPartition) {
252-
synchronized (this.pausedPartitions) {
253-
this.pausedPartitions.add(topicPartition);
252+
synchronized (this.pauseRequestedPartitions) {
253+
this.pauseRequestedPartitions.add(topicPartition);
254254
}
255255
}
256256

257257
@Override
258258
public void resumePartition(TopicPartition topicPartition) {
259-
synchronized (this.pausedPartitions) {
260-
this.pausedPartitions.remove(topicPartition);
259+
synchronized (this.pauseRequestedPartitions) {
260+
this.pauseRequestedPartitions.remove(topicPartition);
261261
}
262262
}
263263

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,14 @@ public void resumePartition(TopicPartition topicPartition) {
310310
}
311311
}
312312

313+
@Override
314+
public boolean isPartitionPaused(TopicPartition topicPartition) {
315+
return this
316+
.containers
317+
.stream()
318+
.anyMatch(container -> container.isPartitionPaused(topicPartition));
319+
}
320+
313321
private boolean containsPartition(TopicPartition topicPartition, KafkaMessageListenerContainer<K, V> container) {
314322
Collection<TopicPartition> assignedPartitions = container.getAssignedPartitions();
315323
return assignedPartitions != null && assignedPartitions.contains(topicPartition);

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaBackoffException.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class KafkaBackoffException extends KafkaException {
3434

3535
private final TopicPartition topicPartition;
3636

37-
private final String dueTimestamp;
37+
private final long dueTimestamp;
3838

3939
/**
4040
* Constructor with data from the BackOff event.
@@ -44,7 +44,7 @@ public class KafkaBackoffException extends KafkaException {
4444
* @param listenerId the listenerId for the consumer that was backed off.
4545
* @param dueTimestamp the time at which the message should be consumed.
4646
*/
47-
public KafkaBackoffException(String message, TopicPartition topicPartition, String listenerId, String dueTimestamp) {
47+
public KafkaBackoffException(String message, TopicPartition topicPartition, String listenerId, long dueTimestamp) {
4848
super(message);
4949
this.listenerId = listenerId;
5050
this.topicPartition = topicPartition;
@@ -59,7 +59,7 @@ public TopicPartition getTopicPartition() {
5959
return this.topicPartition;
6060
}
6161

62-
public String getDueTimestamp() {
62+
public long getDueTimestamp() {
6363
return this.dueTimestamp;
6464
}
6565
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaConsumerBackoffManager.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,19 @@
1717
package org.springframework.kafka.listener;
1818

1919
import java.time.Clock;
20-
import java.time.LocalDateTime;
20+
import java.time.Instant;
2121
import java.time.temporal.ChronoUnit;
2222
import java.util.HashMap;
2323
import java.util.Map;
2424

2525
import org.apache.commons.logging.LogFactory;
2626
import org.apache.kafka.common.TopicPartition;
2727

28+
import org.springframework.beans.factory.annotation.Qualifier;
2829
import org.springframework.context.ApplicationListener;
2930
import org.springframework.core.log.LogAccessor;
3031
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
3132
import org.springframework.kafka.event.ListenerContainerPartitionIdleEvent;
32-
import org.springframework.kafka.retrytopic.RetryTopicHeaders;
3333

3434
/**
3535
*
@@ -45,21 +45,26 @@ public class KafkaConsumerBackoffManager implements ApplicationListener<Listener
4545

4646
private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(KafkaConsumerBackoffManager.class));
4747

48+
/**
49+
* Internal Back Off Clock Bean Name.
50+
*/
51+
public static final String INTERNAL_BACKOFF_CLOCK_BEAN_NAME = "internalBackOffClock";
52+
4853
private final KafkaListenerEndpointRegistry registry;
4954

5055
private final Map<TopicPartition, Context> backOffTimes;
5156

5257
private final Clock clock;
5358

5459
public KafkaConsumerBackoffManager(KafkaListenerEndpointRegistry registry,
55-
Clock clock) {
60+
@Qualifier(INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
5661
this.registry = registry;
5762
this.clock = clock;
5863
this.backOffTimes = new HashMap<>();
5964
}
6065

6166
public void maybeBackoff(Context context) {
62-
long backoffTime = ChronoUnit.MILLIS.between(LocalDateTime.now(this.clock), context.dueTimestamp);
67+
long backoffTime = ChronoUnit.MILLIS.between(Instant.now(this.clock), Instant.ofEpochMilli(context.dueTimestamp));
6368
if (backoffTime > 0) {
6469
pauseConsumptionAndThrow(context, backoffTime);
6570
}
@@ -71,8 +76,7 @@ private void pauseConsumptionAndThrow(Context context, Long timeToSleep) throws
7176
addBackoff(context, topicPartition);
7277
throw new KafkaBackoffException(String.format("Partition %s from topic %s is not ready for consumption, " +
7378
"backing off for approx. %s millis.", context.topicPartition.partition(), context.topicPartition.topic(), timeToSleep),
74-
topicPartition, context.listenerId, context.dueTimestamp.format(
75-
RetryTopicHeaders.DEFAULT_BACKOFF_TIMESTAMP_HEADER_FORMATTER));
79+
topicPartition, context.listenerId, context.dueTimestamp);
7680
}
7781

7882
private MessageListenerContainer getListenerContainerFromContext(Context context) {
@@ -82,7 +86,7 @@ private MessageListenerContainer getListenerContainerFromContext(Context context
8286
@Override
8387
public void onApplicationEvent(ListenerContainerPartitionIdleEvent partitionIdleEvent) {
8488
Context context = getBackoff(partitionIdleEvent.getTopicPartition());
85-
if (context == null || LocalDateTime.now(this.clock).isBefore(context.dueTimestamp)) {
89+
if (context == null || Instant.now(this.clock).isBefore(Instant.ofEpochMilli(context.dueTimestamp))) {
8690
return;
8791
}
8892
MessageListenerContainer container = getListenerContainerFromContext(context);
@@ -108,7 +112,7 @@ protected void removeBackoff(TopicPartition topicPartition) {
108112
}
109113
}
110114

111-
public Context createContext(LocalDateTime dueTimestamp, String listenerId, TopicPartition topicPartition) {
115+
public Context createContext(long dueTimestamp, String listenerId, TopicPartition topicPartition) {
112116
return new Context(dueTimestamp, listenerId, topicPartition);
113117
}
114118

@@ -117,7 +121,7 @@ public static class Context {
117121
/**
118122
* The time after which the message should be processed.
119123
*/
120-
final LocalDateTime dueTimestamp;
124+
final long dueTimestamp;
121125

122126
/**
123127
* The id for the listener that should be paused.
@@ -129,7 +133,7 @@ public static class Context {
129133
*/
130134
final TopicPartition topicPartition;
131135

132-
Context(LocalDateTime dueTimestamp, String listenerId, TopicPartition topicPartition) {
136+
Context(long dueTimestamp, String listenerId, TopicPartition topicPartition) {
133137
this.dueTimestamp = dueTimestamp;
134138
this.listenerId = listenerId;
135139
this.topicPartition = topicPartition;

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,12 @@ public boolean isContainerPaused() {
279279
return isPaused() && this.listenerConsumer != null && this.listenerConsumer.isConsumerPaused();
280280
}
281281

282+
@Override
283+
public boolean isPartitionPaused(TopicPartition topicPartition) {
284+
return this.listenerConsumer != null && this.listenerConsumer
285+
.isPartitionPaused(topicPartition);
286+
}
287+
282288
@Override
283289
public Map<String, Map<MetricName, ? extends Metric>> metrics() {
284290
ListenerConsumer listenerConsumerForMetrics = this.listenerConsumer;
@@ -682,6 +688,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
682688

683689
private volatile long lastPoll = System.currentTimeMillis();
684690

691+
private Set<TopicPartition> pausedPartitions;
692+
685693
@SuppressWarnings(UNCHECKED)
686694
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
687695
Properties consumerProperties = propertiesFromProperties();
@@ -769,6 +777,7 @@ else if (listener instanceof MessageListener) {
769777
this.lastReceivePartition = new HashMap<>();
770778
this.lastAlertPartition = new HashMap<>();
771779
this.wasIdlePartition = new HashMap<>();
780+
this.pausedPartitions = new HashSet<>();
772781
}
773782

774783
private Properties propertiesFromProperties() {
@@ -900,6 +909,10 @@ boolean isConsumerPaused() {
900909
return this.consumerPaused;
901910
}
902911

912+
boolean isPartitionPaused(TopicPartition topicPartition) {
913+
return this.pausedPartitions.contains(topicPartition);
914+
}
915+
903916
@Nullable
904917
private TransactionTemplate determineTransactionTemplate() {
905918
if (this.kafkaTxManager != null) {
@@ -1434,28 +1447,32 @@ private void resumeConsumerIfNeccessary() {
14341447
}
14351448

14361449
private void pausePartitionsIfNecessary() {
1437-
Set<TopicPartition> pausedPartitions = this.consumer.paused();
1450+
Set<TopicPartition> pausedConsumerPartitions = this.consumer.paused();
14381451
List<TopicPartition> partitionsToPause = this
14391452
.assignedPartitions
14401453
.stream()
1441-
.filter(tp -> isPartitionPauseRequested(tp) && !pausedPartitions.contains(tp))
1454+
.filter(tp -> isPartitionPauseRequested(tp)
1455+
&& !pausedConsumerPartitions.contains(tp))
14421456
.collect(Collectors.toList());
14431457
if (partitionsToPause.size() > 0) {
14441458
this.consumer.pause(partitionsToPause);
1459+
this.pausedPartitions.addAll(partitionsToPause);
14451460
this.logger.debug(() -> "Paused consumption from " + partitionsToPause);
14461461
partitionsToPause.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionPausedEvent);
14471462
}
14481463
}
14491464

14501465
private void resumePartitionsIfNecessary() {
1451-
Set<TopicPartition> pausedPartitions = this.consumer.paused();
1466+
Set<TopicPartition> pausedConsumerPartitions = this.consumer.paused();
14521467
List<TopicPartition> partitionsToResume = this
14531468
.assignedPartitions
14541469
.stream()
1455-
.filter(tp -> !isPartitionPauseRequested(tp) && pausedPartitions.contains(tp))
1470+
.filter(tp -> !isPartitionPauseRequested(tp)
1471+
&& pausedConsumerPartitions.contains(tp))
14561472
.collect(Collectors.toList());
14571473
if (partitionsToResume.size() > 0) {
14581474
this.consumer.resume(partitionsToResume);
1475+
this.pausedPartitions.removeAll(partitionsToResume);
14591476
this.logger.debug(() -> "Resumed consumption from " + partitionsToResume);
14601477
partitionsToResume.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionResumedEvent);
14611478
}

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,16 @@ default boolean isPartitionPauseRequested(TopicPartition topicPartition) {
131131
throw new UnsupportedOperationException("This container doesn't support pausing a partition");
132132
}
133133

134+
/**
135+
* Whether or not this topic's partition is currently paused.
136+
* @param topicPartition the topic partition to check
137+
* @return true if this partition has been paused.
138+
* @since 2.7
139+
*/
140+
default boolean isPartitionPaused(TopicPartition topicPartition) {
141+
throw new UnsupportedOperationException("This container doesn't support checking if a partition is paused");
142+
}
143+
134144
/**
135145
* Return true if {@link #pause()} has been called; the container might not have actually
136146
* paused yet.

0 commit comments

Comments
 (0)