Skip to content

Commit 6a61cad

Browse files
Add getAssignmentsByClientId() to containers
* Remove redundant `final` from `private` method Co-authored-by: Artem Bilan <abilan@pivotal.io>
1 parent b51a778 commit 6a61cad

File tree

6 files changed

+80
-11
lines changed

6 files changed

+80
-11
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,20 @@ public Collection<TopicPartition> getAssignedPartitions() {
109109
}
110110
}
111111

112+
@Override
113+
public Map<String, Collection<TopicPartition>> getAssignmentsByClientId() {
114+
synchronized (this.lifecycleMonitor) {
115+
Map<String, Collection<TopicPartition>> assignments = new HashMap<>();
116+
this.containers.forEach(container -> {
117+
Map<String, Collection<TopicPartition>> byClientId = container.getAssignmentsByClientId();
118+
if (byClientId != null) {
119+
assignments.putAll(byClientId);
120+
}
121+
});
122+
return assignments;
123+
}
124+
}
125+
112126
@Override
113127
public boolean isContainerPaused() {
114128
synchronized (this.lifecycleMonitor) {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,18 @@ else if (partitionsListenerConsumer.assignedPartitions != null) {
245245
}
246246
}
247247

248+
@Override
249+
@Nullable
250+
public Map<String, Collection<TopicPartition>> getAssignmentsByClientId() {
251+
ListenerConsumer partitionsListenerConsumer = this.listenerConsumer;
252+
if (this.listenerConsumer != null) {
253+
return Collections.singletonMap(partitionsListenerConsumer.getClientId(), getAssignedPartitions());
254+
}
255+
else {
256+
return null;
257+
}
258+
}
259+
248260
@Override
249261
public boolean isContainerPaused() {
250262
return isPaused() && this.listenerConsumer != null && this.listenerConsumer.isConsumerPaused();
@@ -255,11 +267,7 @@ public boolean isContainerPaused() {
255267
ListenerConsumer listenerConsumerForMetrics = this.listenerConsumer;
256268
if (listenerConsumerForMetrics != null) {
257269
Map<MetricName, ? extends Metric> metrics = listenerConsumerForMetrics.consumer.metrics();
258-
Iterator<MetricName> metricIterator = metrics.keySet().iterator();
259-
if (metricIterator.hasNext()) {
260-
String clientId = metricIterator.next().tags().get("client-id");
261-
return Collections.singletonMap(clientId, metrics);
262-
}
270+
return Collections.singletonMap(listenerConsumerForMetrics.getClientId(), metrics);
263271
}
264272
return Collections.emptyMap();
265273
}
@@ -541,6 +549,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
541549

542550
private final Map<TopicPartition, OffsetAndMetadata> commitsDuringRebalance = new HashMap<>();
543551

552+
private final String clientId;
553+
544554
private Map<TopicPartition, OffsetMetadata> definedPartitions;
545555

546556
private int count;
@@ -585,6 +595,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
585595
KafkaMessageListenerContainer.this.clientIdSuffix,
586596
consumerProperties);
587597

598+
this.clientId = determineClientId();
588599
this.transactionTemplate = determineTransactionTemplate();
589600
this.genericListener = listener;
590601
this.consumerSeekAwareListener = checkConsumerSeekAware(listener);
@@ -658,6 +669,19 @@ else if (listener instanceof MessageListener) {
658669
this.subBatchPerPartition = setupSubBatchPerPartition();
659670
}
660671

672+
String getClientId() {
673+
return this.clientId;
674+
}
675+
676+
private String determineClientId() {
677+
Map<MetricName, ? extends Metric> metrics = this.consumer.metrics();
678+
Iterator<MetricName> metricIterator = metrics.keySet().iterator();
679+
if (metricIterator.hasNext()) {
680+
return metricIterator.next().tags().get("client-id");
681+
}
682+
return "unknown.client.id";
683+
}
684+
661685
private void checkGroupInstance(Properties properties, ConsumerFactory<K, V> consumerFactory) {
662686
String groupInstance = properties.getProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
663687
if (!StringUtils.hasText(groupInstance)) {

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,21 @@ default ContainerProperties getContainerProperties() {
6565
* @return the topics/partitions.
6666
* @since 2.1.3
6767
*/
68+
@Nullable
6869
default Collection<TopicPartition> getAssignedPartitions() {
6970
throw new UnsupportedOperationException("This container doesn't support retrieving its assigned partitions");
7071
}
7172

73+
/**
74+
* Return the assigned topics/partitions for this container, by client.id.
75+
* @return the topics/partitions.
76+
* @since 2.5
77+
*/
78+
@Nullable
79+
default Map<String, Collection<TopicPartition>> getAssignmentsByClientId() {
80+
throw new UnsupportedOperationException("This container doesn't support retrieving its assigned partitions");
81+
}
82+
7283
/**
7384
* Pause this container before the next poll().
7485
* @since 2.1.3

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
132132
};
133133
ContainerProperties containerProps = new ContainerProperties(topic1);
134134
containerProps.setLogContainerConfig(true);
135+
containerProps.setClientId("client");
135136

136137
final CountDownLatch latch = new CountDownLatch(3);
137138
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
@@ -164,6 +165,10 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
164165

165166
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
166167
assertThat(container.getAssignedPartitions()).hasSize(2);
168+
Map<String, Collection<TopicPartition>> assignments = container.getAssignmentsByClientId();
169+
assertThat(assignments).hasSize(2);
170+
assertThat(assignments.get("client-0")).isNotNull();
171+
assertThat(assignments.get("client-1")).isNotNull();
167172

168173
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
169174
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);

src/reference/asciidoc/kafka.adoc

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2299,8 +2299,8 @@ BeforeTx
22992299

23002300
|pause
23012301
Requested
2302-
|n/a
2303-
|True if a consumer pause has been requested (read only).
2302+
|(read only)
2303+
|True if a consumer pause has been requested.
23042304

23052305
|record
23062306
Interceptor
@@ -2323,8 +2323,14 @@ Timeout
23232323

23242324
|assigned
23252325
Partitions
2326-
|n/a
2327-
|The partitions currently assigned to this container (explicitly or not) (read only).
2326+
|(read only)
2327+
|The partitions currently assigned to this container (explicitly or not).
2328+
2329+
|assigned
2330+
Partitions
2331+
ByClientId
2332+
|(read only)
2333+
|The partitions currently assigned to this container (explicitly or not).
23282334

23292335
|clientId
23302336
Suffix
@@ -2345,8 +2351,14 @@ Suffix
23452351

23462352
|assigned
23472353
Partitions
2348-
|n/a
2349-
|The aggregate of partitions currently assigned to this container's child `KafkaMessageListenerContainer` s (explicitly or not) (read only).
2354+
|(read only)
2355+
|The aggregate of partitions currently assigned to this container's child `KafkaMessageListenerContainer` s (explicitly or not).
2356+
2357+
|assigned
2358+
Partitions
2359+
ByClientId
2360+
|(read only)
2361+
|The partitions currently assigned to this container's child `KafkaMessageListenerContainer` s (explicitly or not), keyed by the child container's consumer's `client.id` property.
23502362

23512363
|concurrency
23522364
|1

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ See <<error-handlers>> for more information.
6161
You can now control the level at which exceptions intentionally thrown by standard error handlers are logged.
6262
See <<error-handlers>> for more information.
6363

64+
The `getAssignmentsByClientId()` method has been added, making it easier to determine which consumers in a concurrent container are assigned which partition(s).
65+
See <<container-props>> for more information.
66+
6467
[[x25-template]]
6568
==== KafkaTemplate Changes
6669

0 commit comments

Comments
 (0)