Skip to content

Commit 786b8b1

Browse files
garyrussellartembilan
authored andcommitted
GH-1633: Do not stop parent container when fenced
Resolves #1633 Only the child container instance should be stopped when a producer is fenced. Tested with a Boot Application: ```java @SpringBootApplication public class Kgh1633Application { public static void main(String[] args) { SpringApplication.run(Kgh1633Application.class, args); } @Autowired KafkaTemplate<String, String> template; @KafkaListener(id = "kgh1633a", topics = "kgh1633") public void listen(String in) throws InterruptedException { System.out.println(in); this.template.send("kgh1633-1", "foo"); Thread.sleep(10_000); } @bean public NewTopic topic1() { return TopicBuilder.name("kgh1633").partitions(1).replicas(1).build(); } @bean public NewTopic topic2() { return TopicBuilder.name("kgh1633-1").partitions(1).replicas(1).build(); } // @eventlistener // public void started(ConsumerStartedEvent event) { // // temporary until GH-1633 is fixed // KafkaMessageListenerContainer<?, ?> container = (KafkaMessageListenerContainer<?, ?>) event.getSource(); // container.setEmergencyStop(() -> container.stop(() -> { })); // } @eventlistener public void stopped(ConsumerStoppedEvent event) { ((KafkaMessageListenerContainer<?, ?>) event.getSource()).start(); } } @component class Customizer { Customizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory) { factory.getContainerProperties().setStopContainerWhenFenced(true); } } ``` ```properties spring.kafka.producer.transaction-id-prefix=tx- spring.kafka.producer.acks=all spring.kafka.producer.properties.transaction.timeout.ms=5000 ```
1 parent 8b52f56 commit 786b8b1

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1088,7 +1088,11 @@ public void run() {
10881088
+ "' has been fenced");
10891089
break;
10901090
}
1091-
catch (StopAfterFenceException | Error e) { // NOSONAR - rethrown
1091+
catch (StopAfterFenceException e) {
1092+
this.logger.error(e, "Stopping container due to fencing");
1093+
stop();
1094+
}
1095+
catch (Error e) { // NOSONAR - rethrown
10921096
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
10931097
if (runnable != null) {
10941098
runnable.run();

0 commit comments

Comments
 (0)