Skip to content

Commit 2938174

Browse files
committed
GH-1485: Option to suppress single client.id suffx
See #1485
1 parent 0cbf569 commit 2938174

File tree

2 files changed

+30
-2
lines changed

2 files changed

+30
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis
5858

5959
private int concurrency = 1;
6060

61+
private boolean alwaysClientIdSuffix = true;
62+
6163
/**
6264
* Construct an instance with the supplied configuration properties.
6365
* The topic partitions are distributed evenly across the delegate
@@ -86,6 +88,16 @@ public void setConcurrency(int concurrency) {
8688
this.concurrency = concurrency;
8789
}
8890

91+
/**
92+
* Set to false to suppress adding a suffix to the child container's client.id when
93+
* the concurrency is only 1.
94+
* @param alwaysClientIdSuffix false to suppress, true (default) to include.
95+
* @since 2.2.14
96+
*/
97+
public void setAlwaysClientIdSuffix(boolean alwaysClientIdSuffix) {
98+
this.alwaysClientIdSuffix = alwaysClientIdSuffix;
99+
}
100+
89101
/**
90102
* Return the list of {@link KafkaMessageListenerContainer}s created by
91103
* this container.
@@ -166,7 +178,7 @@ protected void doStart() {
166178
if (getApplicationEventPublisher() != null) {
167179
container.setApplicationEventPublisher(getApplicationEventPublisher());
168180
}
169-
container.setClientIdSuffix("-" + i);
181+
container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + i : "");
170182
container.setGenericErrorHandler(getGenericErrorHandler());
171183
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
172184
container.setRecordInterceptor(getRecordInterceptor());

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.BitSet;
3030
import java.util.Collection;
3131
import java.util.HashSet;
32+
import java.util.Iterator;
3233
import java.util.List;
3334
import java.util.Map;
3435
import java.util.Set;
@@ -403,6 +404,7 @@ public void testManualCommitSyncExisting() throws Exception {
403404
latch.countDown();
404405
});
405406
containerProps.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
407+
containerProps.setClientId("myClientId");
406408

407409
ConcurrentMessageListenerContainer<Integer, String> container =
408410
new ConcurrentMessageListenerContainer<>(cf, containerProps);
@@ -417,6 +419,9 @@ public void testManualCommitSyncExisting() throws Exception {
417419
template.flush();
418420
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
419421
assertThat(bitSet.cardinality()).isEqualTo(8);
422+
Set<String> clientIds = container.metrics().keySet();
423+
assertThat(clientIds).hasSize(1);
424+
assertThat(clientIds.iterator().next()).isEqualTo("myClientId-0");
420425
container.stop();
421426
this.logger.info("Stop MANUAL_IMMEDIATE with Existing");
422427
}
@@ -433,10 +438,11 @@ public void testPausedStart() throws Exception {
433438
ConcurrentMessageListenerContainerTests.this.logger.info("paused start: " + message);
434439
latch.countDown();
435440
});
436-
441+
containerProps.setClientId("myClientId");
437442
ConcurrentMessageListenerContainer<Integer, String> container =
438443
new ConcurrentMessageListenerContainer<>(cf, containerProps);
439444
container.setConcurrency(2);
445+
container.setAlwaysClientIdSuffix(false);
440446
container.setBeanName("testBatch");
441447
container.pause();
442448
container.start();
@@ -455,6 +461,11 @@ public void testPausedStart() throws Exception {
455461
container.resume();
456462

457463
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
464+
Set<String> clientIds = container.metrics().keySet();
465+
assertThat(clientIds).hasSize(2);
466+
Iterator<String> iterator = clientIds.iterator();
467+
assertThat(iterator.next()).startsWith("myClientId-");
468+
assertThat(iterator.next()).startsWith("myClientId-");
458469
container.stop();
459470
this.logger.info("Stop paused start");
460471
}
@@ -636,9 +647,11 @@ private void testAckOnErrorWithManualImmediateGuts(String topic, boolean ackOnEr
636647
}
637648

638649
});
650+
containerProps.setClientId("myClientId");
639651
ConcurrentMessageListenerContainer<Integer, String> container = new ConcurrentMessageListenerContainer<>(cf,
640652
containerProps);
641653
container.setConcurrency(1);
654+
container.setAlwaysClientIdSuffix(false);
642655
container.setBeanName("testAckOnErrorWithManualImmediate");
643656
container.start();
644657
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
@@ -651,6 +664,9 @@ private void testAckOnErrorWithManualImmediateGuts(String topic, boolean ackOnEr
651664
template.sendDefault(0, 1, "bar");
652665
template.flush();
653666
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
667+
Set<String> clientIds = container.metrics().keySet();
668+
assertThat(clientIds).hasSize(1);
669+
assertThat(clientIds.iterator().next()).isEqualTo("myClientId");
654670
container.stop();
655671

656672
Consumer<Integer, String> consumer = cf.createConsumer();

0 commit comments

Comments
 (0)