Skip to content

Commit 0d45644

Browse files
garyrussellartembilan
authored andcommitted
GH-1252: Add more consumer lifecycle events
Resolves #1252 - also log an error when a consumer fails to start * Fix publish.
1 parent e90d9e6 commit 0d45644

File tree

8 files changed

+268
-4
lines changed

8 files changed

+268
-4
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2018-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.event;
18+
19+
/**
20+
* An event published when a consumer fails to start.
21+
*
22+
* @author Gary Russell
23+
* @since 2.3
24+
*
25+
*/
26+
public class ConsumerFailedToStartEvent extends KafkaEvent {
27+
28+
private static final long serialVersionUID = 1L;
29+
30+
/**
31+
* Construct an instance with the provided source and container.
32+
* @param source the container instance that generated the event.
33+
* @param container the container or the parent container if the container is a child.
34+
*/
35+
public ConsumerFailedToStartEvent(Object source, Object container) {
36+
super(source, container);
37+
}
38+
39+
@Override
40+
public String toString() {
41+
return "ConsumerFailedToStartEvent [source=" + getSource() + "]";
42+
}
43+
44+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2018-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.event;
18+
19+
/**
20+
* An event published when a consumer has started.
21+
*
22+
* @author Gary Russell
23+
* @since 2.3
24+
*
25+
*/
26+
public class ConsumerStartedEvent extends KafkaEvent {
27+
28+
private static final long serialVersionUID = 1L;
29+
30+
/**
31+
* Construct an instance with the provided source and container.
32+
* @param source the container instance that generated the event.
33+
* @param container the container or the parent container if the container is a child.
34+
*/
35+
public ConsumerStartedEvent(Object source, Object container) {
36+
super(source, container);
37+
}
38+
39+
@Override
40+
public String toString() {
41+
return "ConsumerStartedEvent [source=" + getSource() + "]";
42+
}
43+
44+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2018-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.event;
18+
19+
/**
20+
* An event published when a consumer is initializing.
21+
*
22+
* @author Gary Russell
23+
* @since 2.3
24+
*
25+
*/
26+
public class ConsumerStartingEvent extends KafkaEvent {
27+
28+
private static final long serialVersionUID = 1L;
29+
30+
/**
31+
* Construct an instance with the provided source and container.
32+
* @param source the container instance that generated the event.
33+
* @param container the container or the parent container if the container is a child.
34+
*/
35+
public ConsumerStartingEvent(Object source, Object container) {
36+
super(source, container);
37+
}
38+
39+
@Override
40+
public String toString() {
41+
return "ConsumerStartingEvent [source=" + getSource() + "]";
42+
}
43+
44+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ public enum AckMode {
107107
*/
108108
public static final float DEFAULT_NO_POLL_THRESHOLD = 3f;
109109

110+
private static final Duration DEFAULT_CONSUMER_START_TIMEOUT = Duration.ofSeconds(30);
111+
110112
private final Map<String, String> micrometerTags = new HashMap<>();
111113

112114
/**
@@ -176,6 +178,8 @@ public enum AckMode {
176178

177179
private boolean micrometerEnabled = true;
178180

181+
private Duration consumerStartTimout = DEFAULT_CONSUMER_START_TIMEOUT;
182+
179183
/**
180184
* Create properties for a container that will subscribe to the specified topics.
181185
* @param topics the topics.
@@ -570,6 +574,20 @@ public Map<String, String> getMicrometerTags() {
570574
return Collections.unmodifiableMap(this.micrometerTags);
571575
}
572576

577+
public Duration getConsumerStartTimout() {
578+
return this.consumerStartTimout;
579+
}
580+
581+
/**
582+
* Set the timeout to wait for a consumer thread to start before logging
583+
* an error. Default 30 seconds.
584+
* @param consumerStartTimout the consumer start timeout.
585+
*/
586+
public void setConsumerStartTimout(Duration consumerStartTimout) {
587+
Assert.notNull(consumerStartTimout, "'consumerStartTimout' cannot be null");
588+
this.consumerStartTimout = consumerStartTimout;
589+
}
590+
573591
@Override
574592
public String toString() {
575593
return "ContainerProperties ["

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Set;
3131
import java.util.concurrent.BlockingQueue;
3232
import java.util.concurrent.ConcurrentHashMap;
33+
import java.util.concurrent.CountDownLatch;
3334
import java.util.concurrent.LinkedBlockingQueue;
3435
import java.util.concurrent.ScheduledFuture;
3536
import java.util.concurrent.TimeUnit;
@@ -61,8 +62,11 @@
6162
import org.springframework.kafka.core.ConsumerFactory;
6263
import org.springframework.kafka.core.KafkaResourceHolder;
6364
import org.springframework.kafka.core.ProducerFactory;
65+
import org.springframework.kafka.event.ConsumerFailedToStartEvent;
6466
import org.springframework.kafka.event.ConsumerPausedEvent;
6567
import org.springframework.kafka.event.ConsumerResumedEvent;
68+
import org.springframework.kafka.event.ConsumerStartedEvent;
69+
import org.springframework.kafka.event.ConsumerStartingEvent;
6670
import org.springframework.kafka.event.ConsumerStoppedEvent;
6771
import org.springframework.kafka.event.ConsumerStoppingEvent;
6872
import org.springframework.kafka.event.ListenerContainerIdleEvent;
@@ -132,16 +136,18 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
132136

133137
private final TopicPartitionOffset[] topicPartitions;
134138

135-
private volatile ListenerConsumer listenerConsumer;
136-
137-
private volatile ListenableFuture<?> listenerConsumerFuture;
138-
139139
private String clientIdSuffix;
140140

141141
private Runnable emergencyStop = () -> stop(() -> {
142142
// NOSONAR
143143
});
144144

145+
private volatile ListenerConsumer listenerConsumer;
146+
147+
private volatile ListenableFuture<?> listenerConsumerFuture;
148+
149+
private volatile CountDownLatch startLatch = new CountDownLatch(1);
150+
145151
/**
146152
* Construct an instance with the supplied configuration properties.
147153
* @param consumerFactory the consumer factory.
@@ -320,9 +326,20 @@ protected void doStart() {
320326
ListenerType listenerType = determineListenerType(listener);
321327
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
322328
setRunning(true);
329+
this.startLatch = new CountDownLatch(1);
323330
this.listenerConsumerFuture = containerProperties
324331
.getConsumerTaskExecutor()
325332
.submitListenable(this.listenerConsumer);
333+
try {
334+
if (!this.startLatch.await(containerProperties.getConsumerStartTimout().toMillis(), TimeUnit.MILLISECONDS)) {
335+
this.logger.error("Consumer thread failed to start - does the configured task executor "
336+
+ "have enough threads to support all containers and concurrency?");
337+
publishConsumerFailedToStart();
338+
}
339+
}
340+
catch (@SuppressWarnings("unused") InterruptedException e) {
341+
Thread.currentThread().interrupt();
342+
}
326343
}
327344

328345
private void checkAckMode(ContainerProperties containerProperties) {
@@ -406,6 +423,25 @@ private void publishConsumerStoppedEvent() {
406423
}
407424
}
408425

426+
private void publishConsumerStartingEvent() {
427+
this.startLatch.countDown();
428+
if (getApplicationEventPublisher() != null) {
429+
getApplicationEventPublisher().publishEvent(new ConsumerStartingEvent(this, this.container));
430+
}
431+
}
432+
433+
private void publishConsumerStartedEvent() {
434+
if (getApplicationEventPublisher() != null) {
435+
getApplicationEventPublisher().publishEvent(new ConsumerStartedEvent(this, this.container));
436+
}
437+
}
438+
439+
private void publishConsumerFailedToStart() {
440+
if (getApplicationEventPublisher() != null) {
441+
getApplicationEventPublisher().publishEvent(new ConsumerFailedToStartEvent(this, this.container));
442+
}
443+
}
444+
409445
@Override
410446
protected AbstractMessageListenerContainer<?, ?> parentOrThis() {
411447
return this.container;
@@ -833,6 +869,7 @@ public boolean isLongLived() {
833869

834870
@Override
835871
public void run() {
872+
publishConsumerStartingEvent();
836873
this.consumerThread = Thread.currentThread();
837874
if (this.consumerSeekAwareListener != null) {
838875
this.consumerSeekAwareListener.registerSeekCallback(this);
@@ -841,6 +878,7 @@ public void run() {
841878
this.count = 0;
842879
this.last = System.currentTimeMillis();
843880
initAssignedPartitions();
881+
publishConsumerStartedEvent();
844882
while (isRunning()) {
845883
try {
846884
pollAndInvoke();

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,22 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyString;
22+
import static org.mockito.ArgumentMatchers.eq;
2123
import static org.mockito.BDDMockito.given;
2224
import static org.mockito.BDDMockito.willAnswer;
2325
import static org.mockito.Mockito.mock;
2426
import static org.mockito.Mockito.verify;
2527

28+
import java.time.Duration;
2629
import java.util.Arrays;
2730
import java.util.Collection;
2831
import java.util.Collections;
2932
import java.util.HashMap;
3033
import java.util.List;
3134
import java.util.Map;
35+
import java.util.Set;
36+
import java.util.concurrent.ConcurrentHashMap;
3237
import java.util.concurrent.CountDownLatch;
3338
import java.util.concurrent.TimeUnit;
3439
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,7 +50,11 @@
4550
import org.junit.jupiter.api.Test;
4651

4752
import org.springframework.kafka.core.ConsumerFactory;
53+
import org.springframework.kafka.event.ConsumerFailedToStartEvent;
54+
import org.springframework.kafka.event.ConsumerStartedEvent;
55+
import org.springframework.kafka.event.ConsumerStartingEvent;
4856
import org.springframework.kafka.test.utils.KafkaTestUtils;
57+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
4958

5059
/**
5160
* @author Gary Russell
@@ -54,6 +63,53 @@
5463
*/
5564
public class ConcurrentMessageListenerContainerMockTests {
5665

66+
@SuppressWarnings({ "rawtypes", "unchecked" })
67+
@Test
68+
void testThreadStarvation() throws InterruptedException {
69+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
70+
final Consumer consumer = mock(Consumer.class);
71+
Set<String> consumerThreads = ConcurrentHashMap.newKeySet();
72+
CountDownLatch latch = new CountDownLatch(2);
73+
willAnswer(invocation -> {
74+
consumerThreads.add(Thread.currentThread().getName());
75+
latch.countDown();
76+
Thread.sleep(50);
77+
return new ConsumerRecords<>(Collections.emptyMap());
78+
}).given(consumer).poll(any());
79+
given(consumerFactory.createConsumer(anyString(), anyString(), anyString(),
80+
eq(KafkaTestUtils.defaultPropertyOverrides())))
81+
.willReturn(consumer);
82+
ContainerProperties containerProperties = new ContainerProperties("foo");
83+
containerProperties.setGroupId("grp");
84+
containerProperties.setMessageListener((MessageListener) record -> { });
85+
containerProperties.setMissingTopicsFatal(false);
86+
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
87+
exec.setCorePoolSize(1);
88+
exec.afterPropertiesSet();
89+
containerProperties.setConsumerTaskExecutor(exec);
90+
containerProperties.setConsumerStartTimout(Duration.ofMillis(50));
91+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(consumerFactory,
92+
containerProperties);
93+
container.setConcurrency(2);
94+
CountDownLatch startedLatch = new CountDownLatch(2);
95+
CountDownLatch failedLatch = new CountDownLatch(1);
96+
container.setApplicationEventPublisher(event -> {
97+
if (event instanceof ConsumerStartingEvent || event instanceof ConsumerStartedEvent) {
98+
startedLatch.countDown();
99+
}
100+
else if (event instanceof ConsumerFailedToStartEvent) {
101+
failedLatch.countDown();
102+
}
103+
});
104+
container.start();
105+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
106+
assertThat(consumerThreads).hasSize(1);
107+
assertThat(startedLatch.await(10, TimeUnit.SECONDS)).isTrue();
108+
assertThat(failedLatch.await(10, TimeUnit.SECONDS)).isTrue();
109+
container.stop();
110+
exec.destroy();
111+
}
112+
57113
@SuppressWarnings({ "rawtypes", "unchecked" })
58114
@Test
59115
void testCorrectContainerForConsumerError() throws InterruptedException {

src/reference/asciidoc/kafka.adoc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1813,6 +1813,23 @@ public SeekToCurrentErrorHandler eh() {
18131813

18141814
However, see the note at the beginning of this section; you can avoid using the `RetryTemplate` altogether.
18151815

1816+
[[events]]
1817+
===== Listener Consumer Lifecycle Events
1818+
1819+
The following events are published when containers are started and stopped:
1820+
1821+
* `ConsumerStartingEvent` - published when a consumer thread is first started, before it starts polling.
1822+
* `ConsumerStartedEvent` - published when a consumer is about to start polling.
1823+
* `ConsumerFailedToStartEvent` - published if no `ConsumerStartingEvent` is published within the `consumerStartTimeout` container property.
1824+
This event might signal that the configured task executor has insufficient threads to support the containers it is used in and their concurrency.
1825+
An error message is also logged when this condition occurs.
1826+
* `IdleContainerEvent` - discussed in <<idle-containers>>.
1827+
* `NonResponsiveConsumerEvent` - discussed in <<idle-containers>>.
1828+
* `ConsumerPausedEvent` - discussed in <<pause-resume>>.
1829+
* `ConsumerResumedEvent` - discussed in <<pause-resume>>.
1830+
* `ConsumerStoppingEvent` - published when a consumer begins to stop.
1831+
* `ConsumerStartedEvent` - published when a consumer is stopped.
1832+
18161833
[[idle-containers]]
18171834
===== Detecting Idle and Non-Responsive Consumers
18181835

0 commit comments

Comments
 (0)