Skip to content

Commit 1a2f671

Browse files
GH 920 - Topic-based retry support (#1664)
* GH-920: Adds the ability to pause partitions and the corresponding events to MessageListenerContainer interface and its implementations. * GH-920: Adds a BackOff Manager for Kafka consumption and a ListenerAdapter that makes use of it. * GH-920: Adds the RetryTopic functionality. See RetryTopicConfigurer.java in the retrytopic package for more information. * GH-920: Some checkstyle adjustments * GH-920: Improvements on the RetryTopicConfigurer javadoc overview of the functionality. * GH-920: - Changes in dependency management - RetryTopicBootstrapper - Some refactorings * GH-920: Many improvements Test implementations * GH-920: CheckStyle corrections * GH-920: Updating RetryTopicConfigurer javadoc * GH-920: Addressing warnings * GH-920: - Created the RetryTopic documentation - Updated relevant parts of the documentation - Minor adjustments * GH-920: Updating RetryTopicConfigurer javadoc * GH-920: Style and documentation adjustments * GH-920: Adjustments on documentation and tests * GH-920: Fixed failing test * GH-920: -- Fixes topic names when there's a duplicated delay value -- Fixes class that has the builder (RetryTopicConfiguration not RetryTopicConfigurer) -- Fixes an error if the RetryTopicBootstrapper has to instantiate the BackOffManager's Clock
1 parent 074f991 commit 1a2f671

File tree

54 files changed

+7885
-13
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+7885
-13
lines changed

spring-kafka-docs/src/main/asciidoc/index.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ include::streams.adoc[]
4343

4444
include::testing.adoc[]
4545

46+
include::retrytopic.adoc[]
47+
4648
[[tips-n-tricks]]
4749
== Tips, Tricks and Examples
4850

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2345,6 +2345,10 @@ The time to process a batch of records plus this value must be less than the `ma
23452345
|`null`
23462346
|When set, enables publication of `ListenerContainerIdleEvent` s, see <<events>> and <<idle-containers>>.
23472347

2348+
|idlePartitionEventInterval
2349+
|`null`
2350+
|When set, enables publication of `ListenerContainerIdlePartitionEvent` s, see <<events>> and <<idle-containers>>.
2351+
23482352
|kafkaConsumerProperties
23492353
|None
23502354
|Used to override any arbitrary consumer properties configured on the consumer factory.
@@ -2578,7 +2582,11 @@ This event might signal that the configured task executor has insufficient threa
25782582
An error message is also logged when this condition occurs.
25792583
* `ListenerContainerIdleEvent`: published when no messages have been received in `idleInterval` (if configured).
25802584
* `ListenerContainerNoLongerIdleEvent`: published when a record is consumed after previously publishing a `ListenerContainerIdleEvent`.
2585+
* `ListenerContainerPartitionIdleEvent`: published when no messages have been received from that partition in `idlePartitionEventInterval` (if configured).
2586+
* `ListenerContainerPartitionNoLongerIdleEvent`: published when a record is consumed from a partition that has previously published a `ListenerContainerPartitionIdleEvent`.
25812587
* `NonResponsiveConsumerEvent`: published when the consumer appears to be blocked in the `poll` method.
2588+
* `ConsumerPartitionPausedEvent`: published by each consumer when a partition is paused.
2589+
* `ConsumerPartitionResumedEvent`: published by each consumer when a partition is resumed.
25822590
* `ConsumerPausedEvent`: published by each consumer when the container is paused.
25832591
* `ConsumerResumedEvent`: published by each consumer when the container is resumed.
25842592
* `ConsumerStoppingEvent`: published by each consumer just before stopping.
@@ -2603,6 +2611,22 @@ See <<pause-resume>> for more information.
26032611

26042612
The `ListenerContainerNoLongerIdleEvent` has the same properties, except `idleTime` and `paused`.
26052613

2614+
2615+
The `ListenerContainerPartitionIdleEvent` has the following properties:
2616+
2617+
* `source`: The listener container instance that published the event.
2618+
* `container`: The listener container or the parent listener container, if the source container is a child.
2619+
* `id`: The listener ID (or container bean name).
2620+
* `idleTime`: The time partition consumption had been idle when the event was published.
2621+
* `topicPartition`: The topic and partition that triggered the event.
2622+
* `consumer`: A reference to the Kafka `Consumer` object.
2623+
For example, if the consumer's `pause()` method was previously called, it can `resume()` when the event is received.
2624+
* `paused`: Whether that partition consumption is currently paused for that consumer.
2625+
See <<pause-resume>> for more information.
2626+
2627+
The `ListenerContainerPartitionNoLongerIdleEvent` has the same properties, except `idleTime` and `paused`.
2628+
2629+
26062630
The `NonResponsiveConsumerEvent` has the following properties:
26072631

26082632
* `source`: The listener container instance that published the event.
@@ -2621,6 +2645,12 @@ The `ConsumerPausedEvent`, `ConsumerResumedEvent`, and `ConsumerStopping` events
26212645
* `container`: The listener container or the parent listener container, if the source container is a child.
26222646
* `partitions`: The `TopicPartition` instances involved.
26232647

2648+
The `ConsumerPartitionPausedEvent`, `ConsumerPartitionResumedEvent` events have the following properties:
2649+
2650+
* `source`: The listener container instance that published the event.
2651+
* `container`: The listener container or the parent listener container, if the source container is a child.
2652+
* `partition`: The `TopicPartition` instance involved.
2653+
26242654
The `ConsumerStartingEvent`, `ConsumerStartingEvent`, `ConsumerFailedToStartEvent`, `ConsumerStoppedEvent` and `ContainerStoppedEvent` events have the following properties:
26252655

26262656
* `source`: The listener container instance that published the event.
@@ -3630,6 +3660,16 @@ thing2
36303660
----
36313661
====
36323662

3663+
[[pause-resume-partitions]]
3664+
==== Pausing and Resuming Partitions on Listener Containers
3665+
3666+
Since version 2.7 you can pause and resume the consumption of specific partitions assigned to that consumer by using the `pausePartition(TopicPartition topicPartition)` and `resumePartition(TopicPartition topicPartition)` methods in the listener containers.
3667+
The pausing and resuming takes place before and after the `poll()` similar to the `pause()` and `resume()` methods.
3668+
The `isPartitionConsumerPaused()` method returns true if pause for that partition has been requested.
3669+
3670+
Also since version 2.7 `ConsumerPartitionPausedEvent` and `ConsumerPartitionResumedEvent` instances are published with the container as the `source` property and the `TopicPartition` instance.
3671+
3672+
36333673
[[serdes]]
36343674
==== Serialization, Deserialization, and Message Conversion
36353675

0 commit comments

Comments
 (0)