File tree Expand file tree Collapse file tree 4 files changed +44
-5
lines changed
main/java/org/springframework/kafka/config
test/java/org/springframework/kafka/core Expand file tree Collapse file tree 4 files changed +44
-5
lines changed Original file line number Diff line number Diff line change @@ -138,7 +138,7 @@ subprojects { subproject ->
138138 test {
139139 // suppress all console output during testing unless running `gradle -i`
140140 logging. captureStandardOutput(LogLevel . INFO )
141- maxHeapSize = ' 1024m '
141+ maxHeapSize = ' 1536m '
142142 jacoco {
143143 append = false
144144 destinationFile = file(" $buildDir /jacoco.exec" )
Original file line number Diff line number Diff line change 1616
1717package org .springframework .kafka .config ;
1818
19+ import java .util .ArrayList ;
1920import java .util .HashMap ;
2021import java .util .List ;
2122import java .util .Map ;
@@ -73,7 +74,22 @@ public TopicBuilder replicas(int replicaCount) {
7374 * @see NewTopic#replicasAssignments()
7475 */
7576 public TopicBuilder replicasAssignments (Map <Integer , List <Integer >> replicaAssignments ) {
76- this .replicasAssignments = replicaAssignments ;
77+ replicaAssignments .forEach ((part , list ) -> assignReplicas (part , list ));
78+ return this ;
79+ }
80+
81+ /**
82+ * Add an individual replica assignment.
83+ * @param partition the partition.
84+ * @param replicaList the replicas.
85+ * @return the builder.
86+ * @see NewTopic#replicasAssignments()
87+ */
88+ public TopicBuilder assignReplicas (int partition , List <Integer > replicaList ) {
89+ if (this .replicasAssignments == null ) {
90+ this .replicasAssignments = new HashMap <>();
91+ }
92+ this .replicasAssignments .put (partition , new ArrayList <>(replicaList ));
7793 return this ;
7894 }
7995
Original file line number Diff line number Diff line change @@ -64,6 +64,9 @@ public class KafkaAdminTests {
6464 @ Autowired
6565 private NewTopic topic2 ;
6666
67+ @ Autowired
68+ private NewTopic topic3 ;
69+
6770 @ Test
6871 public void testTopicConfigs () {
6972 assertThat (topic1 .configs ()).containsEntry (
@@ -75,6 +78,7 @@ public void testTopicConfigs() {
7578 assertThat (TopicBuilder .name ("foo" )
7679 .replicas (3 )
7780 .build ().replicationFactor ()).isEqualTo ((short ) 3 );
81+ assertThat (topic3 .replicasAssignments ()).hasSize (3 );
7882 }
7983
8084 @ Test
@@ -118,7 +122,7 @@ public static class Config {
118122
119123 @ Bean
120124 public EmbeddedKafkaBroker kafkaEmbedded () {
121- return new EmbeddedKafkaBroker (1 );
125+ return new EmbeddedKafkaBroker (3 );
122126 }
123127
124128 @ Bean
@@ -146,6 +150,16 @@ public NewTopic topic2() {
146150 .build ();
147151 }
148152
153+ @ Bean
154+ public NewTopic topic3 () {
155+ return TopicBuilder .name ("baz" )
156+ .assignReplicas (0 , Arrays .asList (0 , 1 ))
157+ .assignReplicas (1 , Arrays .asList (1 , 2 ))
158+ .assignReplicas (2 , Arrays .asList (2 , 0 ))
159+ .config (TopicConfig .COMPRESSION_TYPE_CONFIG , "zstd" )
160+ .build ();
161+ }
162+
149163 }
150164
151165}
Original file line number Diff line number Diff line change @@ -18,8 +18,7 @@ The following example shows how to do so:
1818@Bean
1919public KafkaAdmin admin() {
2020 Map<String, Object> configs = new HashMap<>();
21- configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
22- StringUtils.arrayToCommaDelimitedString(embeddedKafka().getBrokerAddresses()));
21+ configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
2322 return new KafkaAdmin(configs);
2423}
2524
@@ -40,6 +39,16 @@ public NewTopic topic2() {
4039 .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
4140 .build();
4241}
42+
43+ @Bean
44+ public NewTopic topic3() {
45+ return TopicBuilder.name("thing3")
46+ .assignReplicas(0, Arrays.asList(0, 1))
47+ .assignReplicas(1, Arrays.asList(1, 2))
48+ .assignReplicas(2, Arrays.asList(2, 0))
49+ .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
50+ .build();
51+ }
4352----
4453====
4554
You can’t perform that action at this time.
0 commit comments